diff --git a/pkg/controllers/manager.go b/pkg/controllers/manager.go index 136fe656..62cc9e22 100644 --- a/pkg/controllers/manager.go +++ b/pkg/controllers/manager.go @@ -62,17 +62,7 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. clusterInformers.Cluster().V1beta2().ManagedClusterSetBindings(), clusterInformers.Cluster().V1beta1().Placements(), clusterInformers.Cluster().V1beta1().PlacementDecisions(), - scheduler, - controllerContext.EventRecorder, recorder, - ) - - schedulingControllerResync := scheduling.NewSchedulingControllerResync( - clusterClient, - clusterInformers.Cluster().V1().ManagedClusters(), - clusterInformers.Cluster().V1beta2().ManagedClusterSets(), - clusterInformers.Cluster().V1beta2().ManagedClusterSetBindings(), - clusterInformers.Cluster().V1beta1().Placements(), - clusterInformers.Cluster().V1beta1().PlacementDecisions(), + clusterInformers.Cluster().V1alpha1().AddOnPlacementScores(), scheduler, controllerContext.EventRecorder, recorder, ) @@ -80,7 +70,6 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. go clusterInformers.Start(ctx.Done()) go schedulingController.Run(ctx, 1) - go schedulingControllerResync.Run(ctx, 1) <-ctx.Done() return nil diff --git a/pkg/controllers/scheduling/cluster_event_handler.go b/pkg/controllers/scheduling/cluster_event_handler.go index 65a52c7c..7432a3a7 100644 --- a/pkg/controllers/scheduling/cluster_event_handler.go +++ b/pkg/controllers/scheduling/cluster_event_handler.go @@ -4,26 +4,17 @@ import ( "fmt" "reflect" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" cache "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1" - clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2" - clusterapiv1 "open-cluster-management.io/api/cluster/v1" - clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2" ) type clusterEventHandler struct { - clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister - clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister - placementLister clusterlisterv1beta1.PlacementLister - enqueuePlacementFunc enqueuePlacementFunc + enqueuer *enqueuer } func (h *clusterEventHandler) OnAdd(obj interface{}) { - h.onChange(obj) + h.enqueuer.enqueueCluster(obj) } func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) { @@ -31,7 +22,7 @@ func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) { if !ok { return } - h.onChange(newObj) + h.enqueuer.enqueueCluster(newObj) if oldObj == nil { return @@ -43,65 +34,17 @@ func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) { // if the cluster labels changes, process the original clusterset if !reflect.DeepEqual(newCluster.Labels, oldCluster.Labels) { - h.onChange(oldCluster) + h.enqueuer.enqueueCluster(oldCluster) } } func (h *clusterEventHandler) OnDelete(obj interface{}) { switch t := obj.(type) { case *clusterapiv1.ManagedCluster: - h.onChange(obj) + h.enqueuer.enqueueCluster(obj) case cache.DeletedFinalStateUnknown: - h.onChange(t.Obj) + h.enqueuer.enqueueCluster(t.Obj) default: utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) } } - -func (h *clusterEventHandler) onChange(obj interface{}) { - cluster, ok := obj.(metav1.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return - } - - clusterSetNames, err := h.getClusterSetNames(cluster) - if err != nil { - klog.V(4).Infof("Unable to get clusterset of cluster %q: %v", cluster.GetName(), err) - return - } - - // skip cluster belongs to no clusterset - if len(clusterSetNames) == 0 { - return - } - - for _, clusterSetName := range clusterSetNames { - // enqueue placements which might be impacted - err = enqueuePlacementsByClusterSet( - clusterSetName, - h.clusterSetBindingLister, - h.placementLister, - h.enqueuePlacementFunc, - ) - if err != nil { - klog.Errorf("Unable to enqueue placements with access to clusterset %q: %v", clusterSetName, err) - } - } -} - -// getClusterSetName returns the name of the clusterset the cluster belongs to. It also checks the existence -// of the clusterset. -func (h *clusterEventHandler) getClusterSetNames(cluster metav1.Object) ([]string, error) { - clusterSetNames := []string{} - clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster.(*clusterapiv1.ManagedCluster), h.clusterSetLister) - if err != nil { - return clusterSetNames, err - } - - for _, cs := range clusterSets { - clusterSetNames = append(clusterSetNames, cs.Name) - } - - return clusterSetNames, nil -} diff --git a/pkg/controllers/scheduling/cluster_event_handler_test.go b/pkg/controllers/scheduling/cluster_event_handler_test.go index 43f1e9ea..38c78c4e 100644 --- a/pkg/controllers/scheduling/cluster_event_handler_test.go +++ b/pkg/controllers/scheduling/cluster_event_handler_test.go @@ -1,7 +1,7 @@ package scheduling import ( - "fmt" + "k8s.io/client-go/util/workqueue" "strings" "testing" @@ -93,19 +93,24 @@ func TestOnClusterChange(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) + syncCtx := testinghelpers.NewFakeSyncContext(t, "fake") + q := newEnqueuer( + syncCtx.Queue(), + clusterInformerFactory.Cluster().V1().ManagedClusters(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(), + clusterInformerFactory.Cluster().V1beta1().Placements(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(), + ) queuedKeys := sets.NewString() - handler := &clusterEventHandler{ - clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), - clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(), - placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - enqueuePlacementFunc: func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, + fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + queuedKeys.Insert(key) } + q.enqueuePlacementFunc = fakeEnqueuePlacement - handler.onChange(c.obj) + q.enqueueCluster(c.obj) expectedQueuedKeys := sets.NewString(c.queuedKeys...) if !queuedKeys.Equal(expectedQueuedKeys) { t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) @@ -247,16 +252,24 @@ func TestOnClusterUpdate(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) + syncCtx := testinghelpers.NewFakeSyncContext(t, "fake") + q := newEnqueuer( + syncCtx.Queue(), + clusterInformerFactory.Cluster().V1().ManagedClusters(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(), + clusterInformerFactory.Cluster().V1beta1().Placements(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(), + ) queuedKeys := sets.NewString() + fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + queuedKeys.Insert(key) + } + q.enqueuePlacementFunc = fakeEnqueuePlacement handler := &clusterEventHandler{ - clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), - clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(), - placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - enqueuePlacementFunc: func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, + enqueuer: q, } handler.OnUpdate(c.oldObj, c.newObj) @@ -339,16 +352,24 @@ func TestOnClusterDelete(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) + syncCtx := testinghelpers.NewFakeSyncContext(t, "fake") + q := newEnqueuer( + syncCtx.Queue(), + clusterInformerFactory.Cluster().V1().ManagedClusters(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(), + clusterInformerFactory.Cluster().V1beta1().Placements(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(), + ) queuedKeys := sets.NewString() + fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + queuedKeys.Insert(key) + } + q.enqueuePlacementFunc = fakeEnqueuePlacement handler := &clusterEventHandler{ - clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), - clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(), - placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - enqueuePlacementFunc: func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, + enqueuer: q, } handler.OnDelete(c.obj) diff --git a/pkg/controllers/scheduling/clusterset_event_handler.go b/pkg/controllers/scheduling/clusterset_event_handler.go deleted file mode 100644 index bff653b0..00000000 --- a/pkg/controllers/scheduling/clusterset_event_handler.go +++ /dev/null @@ -1,97 +0,0 @@ -package scheduling - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - cache "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - errorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers" - clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1" - clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2" - - clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2" -) - -type clusterSetEventHandler struct { - clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister - placementLister clusterlisterv1beta1.PlacementLister - enqueuePlacementFunc enqueuePlacementFunc -} - -func (h *clusterSetEventHandler) OnAdd(obj interface{}) { - h.onChange(obj) -} - -func (h *clusterSetEventHandler) OnUpdate(oldObj, newObj interface{}) { - // ignore Update event -} - -func (h *clusterSetEventHandler) OnDelete(obj interface{}) { - var clusterSetName string - switch t := obj.(type) { - case *clusterapiv1beta2.ManagedClusterSet: - clusterSetName = t.Name - case cache.DeletedFinalStateUnknown: - clusterSet, ok := t.Obj.(metav1.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) - return - } - clusterSetName = clusterSet.GetName() - default: - utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return - } - - err := enqueuePlacementsByClusterSet(clusterSetName, h.clusterSetBindingLister, - h.placementLister, h.enqueuePlacementFunc) - if err != nil { - klog.Errorf("Unable to enqueue placements by clusterset %q: %v", clusterSetName, err) - } -} - -func (h *clusterSetEventHandler) onChange(obj interface{}) { - accessor, err := meta.Accessor(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("error accessing metadata: %w", err)) - return - } - - clusterSetName := accessor.GetName() - err = enqueuePlacementsByClusterSet(clusterSetName, h.clusterSetBindingLister, - h.placementLister, h.enqueuePlacementFunc) - if err != nil { - klog.Errorf("Unable to enqueue placements by clusterset %q: %v", clusterSetName, err) - } -} - -// enqueuePlacementsByClusterSet enqueues placements that might be impacted by the given clusterset into -// controller queue for further reconciliation -func enqueuePlacementsByClusterSet( - clusterSetName string, - clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister, - placementLister clusterlisterv1beta1.PlacementLister, - enqueuePlacementFunc enqueuePlacementFunc, -) error { - bindings, err := clusterSetBindingLister.List(labels.Everything()) - if err != nil { - return err - } - - errs := []error{} - for _, binding := range bindings { - if binding.Name != clusterSetName { - continue - } - - if err := enqueuePlacementsByClusterSetBinding(binding.Namespace, binding.Name, placementLister, enqueuePlacementFunc); err != nil { - errs = append(errs, err) - } - } - return errorhelpers.NewMultiLineAggregate(errs) -} diff --git a/pkg/controllers/scheduling/clusterset_event_handler_test.go b/pkg/controllers/scheduling/clusterset_event_handler_test.go deleted file mode 100644 index 964ad623..00000000 --- a/pkg/controllers/scheduling/clusterset_event_handler_test.go +++ /dev/null @@ -1,219 +0,0 @@ -package scheduling - -import ( - "fmt" - "strings" - "testing" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" - cache "k8s.io/client-go/tools/cache" - - clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" - clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2" - testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing" -) - -func TestEnqueuePlacementsByClusterSet(t *testing.T) { - cases := []struct { - name string - clusterSetName string - initObjs []runtime.Object - queuedKeys []string - }{ - { - name: "enqueue placements in a namespace", - clusterSetName: "clusterset1", - initObjs: []runtime.Object{ - testinghelpers.NewClusterSet("clusterset1").Build(), - testinghelpers.NewClusterSet("clusterset2").Build(), - testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - testinghelpers.NewClusterSetBinding("ns1", "clusterset2"), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - - queuedKeys := sets.NewString() - err := enqueuePlacementsByClusterSet( - c.clusterSetName, - clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(), - clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, - ) - if err != nil { - t.Errorf("unexpected err: %v", err) - } - - expectedQueuedKeys := sets.NewString(c.queuedKeys...) - if !queuedKeys.Equal(expectedQueuedKeys) { - t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) - } - }) - } -} - -func TestOnClusterSetAdd(t *testing.T) { - cases := []struct { - name string - obj interface{} - initObjs []runtime.Object - queuedKeys []string - }{ - { - name: "invalid object type", - obj: "invalid object type", - }, - { - name: "clusterset selector type is LegacyClusterSetLabel", - obj: testinghelpers.NewClusterSet("clusterset1").Build(), - initObjs: []runtime.Object{ - testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - { - name: "clusterset selector type is LabelSelector", - obj: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{ - SelectorType: clusterapiv1beta2.LabelSelector, - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "cloud": "Amazon", - }, - }, - }).Build(), - initObjs: []runtime.Object{ - testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - - queuedKeys := sets.NewString() - handler := &clusterSetEventHandler{ - clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(), - placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - enqueuePlacementFunc: func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, - } - - handler.OnAdd(c.obj) - expectedQueuedKeys := sets.NewString(c.queuedKeys...) - if !queuedKeys.Equal(expectedQueuedKeys) { - t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) - } - }) - } -} - -func TestOnClusterSetDelete(t *testing.T) { - cases := []struct { - name string - obj interface{} - initObjs []runtime.Object - queuedKeys []string - }{ - { - name: "invalid object type", - obj: "invalid object type", - }, - { - name: "clusterset selector type is LegacyClusterSetLabel", - obj: testinghelpers.NewClusterSet("clusterset1").Build(), - initObjs: []runtime.Object{ - testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - { - name: "clusterset selector type is LabelSelector", - obj: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{ - SelectorType: clusterapiv1beta2.LabelSelector, - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "cloud": "Amazon", - }, - }, - }).Build(), - initObjs: []runtime.Object{ - testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - { - name: "tombstone", - obj: cache.DeletedFinalStateUnknown{ - Obj: testinghelpers.NewClusterSet("clusterset1").Build(), - }, - initObjs: []runtime.Object{ - testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - { - name: "tombstone with invalid object type", - obj: cache.DeletedFinalStateUnknown{ - Obj: "invalid object type", - }, - initObjs: []runtime.Object{ - testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - - queuedKeys := sets.NewString() - handler := &clusterSetEventHandler{ - clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(), - placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - enqueuePlacementFunc: func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, - } - - handler.OnDelete(c.obj) - expectedQueuedKeys := sets.NewString(c.queuedKeys...) - if !queuedKeys.Equal(expectedQueuedKeys) { - t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) - } - }) - } -} diff --git a/pkg/controllers/scheduling/clustersetbinding_event_handler.go b/pkg/controllers/scheduling/clustersetbinding_event_handler.go deleted file mode 100644 index 23cba9db..00000000 --- a/pkg/controllers/scheduling/clustersetbinding_event_handler.go +++ /dev/null @@ -1,92 +0,0 @@ -package scheduling - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" - cache "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1" - clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2" - - clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2" -) - -type clusterSetBindingEventHandler struct { - clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister - placementLister clusterlisterv1beta1.PlacementLister - enqueuePlacementFunc enqueuePlacementFunc -} - -func (h *clusterSetBindingEventHandler) OnAdd(obj interface{}) { - h.onChange(obj) -} - -func (h *clusterSetBindingEventHandler) OnUpdate(oldObj, newObj interface{}) { - h.onChange(newObj) -} - -func (h *clusterSetBindingEventHandler) OnDelete(obj interface{}) { - switch t := obj.(type) { - case *clusterapiv1beta2.ManagedClusterSetBinding: - h.onChange(obj) - case cache.DeletedFinalStateUnknown: - h.onChange(t.Obj) - default: - utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) - } -} - -func (h *clusterSetBindingEventHandler) onChange(obj interface{}) { - clusterSetBinding, ok := obj.(metav1.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return - } - - namespace := clusterSetBinding.GetNamespace() - clusterSetBindingName := clusterSetBinding.GetName() - - _, err := h.clusterSetLister.Get(clusterSetBindingName) - // skip if the clusterset does not exist - if errors.IsNotFound(err) { - return - } - if err != nil { - klog.Errorf("Unable to get clusterset %q: %v", clusterSetBindingName, err) - return - } - - err = enqueuePlacementsByClusterSetBinding(namespace, clusterSetBindingName, h.placementLister, h.enqueuePlacementFunc) - if err != nil { - klog.Errorf("Unable to enqueue placements by clustersetbinding %s/%s: %v", namespace, clusterSetBindingName, err) - } -} - -// enqueuePlacementsByClusterSetBinding enqueues placements that might be impacted by a particular clustersetbinding -// into controller queue for further reconciliation -func enqueuePlacementsByClusterSetBinding( - namespace, clusterSetBindingName string, - placementLister clusterlisterv1beta1.PlacementLister, - enqueuePlacementFunc enqueuePlacementFunc, -) error { - placements, err := placementLister.Placements(namespace).List(labels.Everything()) - if err != nil { - return err - } - for _, placement := range placements { - // ignore placement whose .spec.clusterSets is specified but does no include this - // particular clusterset. - clusterSets := sets.NewString(placement.Spec.ClusterSets...) - if clusterSets.Len() != 0 && !clusterSets.Has(clusterSetBindingName) { - continue - } - enqueuePlacementFunc(placement.Namespace, placement.Name) - } - return nil -} diff --git a/pkg/controllers/scheduling/clustersetbinding_event_handler_test.go b/pkg/controllers/scheduling/clustersetbinding_event_handler_test.go deleted file mode 100644 index e9a38f9b..00000000 --- a/pkg/controllers/scheduling/clustersetbinding_event_handler_test.go +++ /dev/null @@ -1,193 +0,0 @@ -package scheduling - -import ( - "fmt" - "strings" - "testing" - - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" - cache "k8s.io/client-go/tools/cache" - - clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" - testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing" -) - -func TestOnClusterSetBindingChange(t *testing.T) { - cases := []struct { - name string - obj interface{} - initObjs []runtime.Object - queuedKeys []string - }{ - { - name: "invalid resource type", - obj: "invalid resource type", - initObjs: []runtime.Object{ - testinghelpers.NewPlacement("ns1", "placement1").Build(), - testinghelpers.NewPlacement("ns2", "placement2").Build(), - }, - }, - { - name: "clusterset does not exist", - obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - initObjs: []runtime.Object{ - testinghelpers.NewPlacement("ns1", "placement1").Build(), - testinghelpers.NewPlacement("ns2", "placement2").Build(), - }, - }, - { - name: "on clustersetbinding change", - obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - initObjs: []runtime.Object{ - testinghelpers.NewClusterSet("clusterset1").Build(), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - testinghelpers.NewPlacement("ns2", "placement2").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - - queuedKeys := sets.NewString() - handler := &clusterSetBindingEventHandler{ - clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), - placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - enqueuePlacementFunc: func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, - } - - handler.onChange(c.obj) - expectedQueuedKeys := sets.NewString(c.queuedKeys...) - if !queuedKeys.Equal(expectedQueuedKeys) { - t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) - } - }) - } -} - -func TestEnqueuePlacementsByClusterSetBinding(t *testing.T) { - cases := []struct { - name string - namespace string - clusterSetBindingName string - initObjs []runtime.Object - queuedKeys []string - }{ - { - name: "enqueue placements by clusterSetBinding", - namespace: "ns1", - clusterSetBindingName: "clusterset1", - initObjs: []runtime.Object{ - testinghelpers.NewPlacement("ns1", "placement1").Build(), - testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(), - testinghelpers.NewPlacement("ns2", "placement3").WithClusterSets("clusterset1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - - queuedKeys := sets.NewString() - err := enqueuePlacementsByClusterSetBinding( - c.namespace, - c.clusterSetBindingName, - clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, - ) - if err != nil { - t.Errorf("unexpected err: %v", err) - } - - expectedQueuedKeys := sets.NewString(c.queuedKeys...) - if !queuedKeys.Equal(expectedQueuedKeys) { - t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) - } - }) - } -} - -func TestOnClusterSetBindingDelete(t *testing.T) { - cases := []struct { - name string - obj interface{} - initObjs []runtime.Object - queuedKeys []string - }{ - { - name: "invalid object type", - obj: "invalid object type", - }, - { - name: "clusterset", - obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - initObjs: []runtime.Object{ - testinghelpers.NewClusterSet("clusterset1").Build(), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - { - name: "tombstone", - obj: cache.DeletedFinalStateUnknown{ - Obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), - }, - initObjs: []runtime.Object{ - testinghelpers.NewClusterSet("clusterset1").Build(), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - queuedKeys: []string{ - "ns1/placement1", - }, - }, - { - name: "tombstone with invalid object type", - obj: cache.DeletedFinalStateUnknown{ - Obj: "invalid object type", - }, - initObjs: []runtime.Object{ - testinghelpers.NewClusterSet("clusterset1").Build(), - testinghelpers.NewPlacement("ns1", "placement1").Build(), - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - - queuedKeys := sets.NewString() - handler := &clusterSetBindingEventHandler{ - clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), - placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(), - enqueuePlacementFunc: func(namespace, name string) { - queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name)) - }, - } - - handler.OnDelete(c.obj) - expectedQueuedKeys := sets.NewString(c.queuedKeys...) - if !queuedKeys.Equal(expectedQueuedKeys) { - t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) - } - }) - } -} diff --git a/pkg/controllers/scheduling/enqueue.go b/pkg/controllers/scheduling/enqueue.go new file mode 100644 index 00000000..1205d0fb --- /dev/null +++ b/pkg/controllers/scheduling/enqueue.go @@ -0,0 +1,252 @@ +package scheduling + +import ( + "fmt" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" + clusterinformerv1beta1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta1" + clusterinformerv1beta2 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta2" + clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1" + clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2" + clusterapiv1 "open-cluster-management.io/api/cluster/v1" + clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1" + clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2" +) + +const ( + // anyClusterSet is an index value for indexPlacementByClusterSet for placement + // not setting any clusterset + anyClusterSet = "*" + placementsByClusterSetBinding = "placementsByClusterSet" + clustersetBindingsByClusterSet = "clustersetBindingsByClusterSet" + placementsByScore = "placementsByScore" +) + +type enqueuer struct { + queue workqueue.RateLimitingInterface + enqueuePlacementFunc func(obj interface{}, queue workqueue.RateLimitingInterface) + + clusterLister clusterlisterv1.ManagedClusterLister + clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister + placementIndexer cache.Indexer + clusterSetBindingIndexer cache.Indexer +} + +func newEnqueuer( + queue workqueue.RateLimitingInterface, + clusterInformer clusterinformerv1.ManagedClusterInformer, + clusterSetInformer clusterinformerv1beta2.ManagedClusterSetInformer, + placementInformer clusterinformerv1beta1.PlacementInformer, + clusterSetBindingInformer clusterinformerv1beta2.ManagedClusterSetBindingInformer) *enqueuer { + placementInformer.Informer().AddIndexers(cache.Indexers{ + placementsByScore: indexPlacementsByScore, + placementsByClusterSetBinding: indexPlacementByClusterSetBinding, + }) + + clusterSetBindingInformer.Informer().AddIndexers(cache.Indexers{ + clustersetBindingsByClusterSet: indexClusterSetBindingByClusterSet, + }) + + return &enqueuer{ + queue: queue, + enqueuePlacementFunc: enqueuePlacement, + clusterLister: clusterInformer.Lister(), + clusterSetLister: clusterSetInformer.Lister(), + placementIndexer: placementInformer.Informer().GetIndexer(), + clusterSetBindingIndexer: clusterSetBindingInformer.Informer().GetIndexer(), + } +} + +func enqueuePlacement(obj interface{}, queue workqueue.RateLimitingInterface) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + queue.Add(key) +} + +func (e *enqueuer) enqueueClusterSetBinding(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + + namespace, _, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(err) + return + } + + // enqueue all placement that ref to the binding + objs, err := e.placementIndexer.ByIndex(placementsByClusterSetBinding, key) + if err != nil { + runtime.HandleError(err) + return + } + + anyObjs, err := e.placementIndexer.ByIndex(placementsByClusterSetBinding, fmt.Sprintf("%s/%s", namespace, anyClusterSet)) + if err != nil { + runtime.HandleError(err) + return + } + + objs = append(objs, anyObjs...) + + for _, o := range objs { + placement := o.(*clusterapiv1beta1.Placement) + klog.V(4).Infof("enqueue placement %s/%s, because of binding %s", placement.Namespace, placement.Name, key) + e.enqueuePlacementFunc(placement, e.queue) + } +} + +func (e *enqueuer) enqueueClusterSet(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + + objs, err := e.clusterSetBindingIndexer.ByIndex(clustersetBindingsByClusterSet, key) + if err != nil { + runtime.HandleError(err) + return + } + + for _, o := range objs { + clusterSetBinding := o.(*clusterapiv1beta2.ManagedClusterSetBinding) + klog.V(4).Infof("enqueue clustersetbinding %s/%s, because of clusterset %s", clusterSetBinding.Namespace, clusterSetBinding.Name, key) + e.enqueueClusterSetBinding(clusterSetBinding) + } +} + +func (e *enqueuer) enqueueCluster(obj interface{}) { + cluster, ok := obj.(*clusterapiv1.ManagedCluster) + if !ok { + runtime.HandleError(fmt.Errorf("obj %T is not a ManagedCluster", obj)) + return + } + + clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister) + if err != nil { + klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err) + return + } + + for _, clusterSet := range clusterSets { + klog.V(4).Infof("enqueue clusterSet %s, because of cluster %s", clusterSet.Name, cluster.Name) + e.enqueueClusterSet(clusterSet) + } +} + +func (e *enqueuer) enqueuePlacementScore(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(err) + return + } + + objs, err := e.placementIndexer.ByIndex(placementsByScore, name) + if err != nil { + runtime.HandleError(err) + return + } + + // filter the namespace of placement based on cluster. Find all related clustersetbinding + // to the cluster at first. Only enqueue placement when its namespace is in the valid namespaces + // of clustersetbindings. + filteredBindingNamespaces := sets.NewString() + cluster, err := e.clusterLister.Get(namespace) + if err != nil { + klog.V(4).Infof("Unable to get cluster %s: %w", namespace, err) + } + + clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister) + if err != nil { + klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err) + return + } + + for _, clusterset := range clusterSets { + bindingObjs, err := e.clusterSetBindingIndexer.ByIndex(clustersetBindingsByClusterSet, clusterset.Name) + if err != nil { + klog.V(4).Infof("Unable to get clusterSetBindings of clusterset %q: %w", clusterset.Name, err) + continue + } + + for _, bindingObj := range bindingObjs { + binding := bindingObj.(*clusterapiv1beta2.ManagedClusterSetBinding) + filteredBindingNamespaces.Insert(binding.Namespace) + } + } + + for _, o := range objs { + placement := o.(*clusterapiv1beta1.Placement) + if filteredBindingNamespaces.Has(placement.Namespace) { + klog.V(4).Infof("enqueue placement %s/%s, because of score %s", placement.Namespace, placement.Name, key) + e.enqueuePlacementFunc(placement, e.queue) + } + } +} + +func indexPlacementByClusterSetBinding(obj interface{}) ([]string, error) { + placement, ok := obj.(*clusterapiv1beta1.Placement) + if !ok { + return []string{}, fmt.Errorf("obj %T is not a Placement", obj) + } + + clustersets := placement.Spec.ClusterSets + if len(clustersets) == 0 { + return []string{fmt.Sprintf("%s/%s", placement.Namespace, anyClusterSet)}, nil + } + + var bindings []string + for _, clusterset := range clustersets { + bindings = append(bindings, fmt.Sprintf("%s/%s", placement.Namespace, clusterset)) + } + + return bindings, nil +} + +func indexPlacementsByScore(obj interface{}) ([]string, error) { + placement, ok := obj.(*clusterapiv1beta1.Placement) + if !ok { + return []string{}, fmt.Errorf("obj %T is not a Placement", obj) + } + + var keys []string + for _, config := range placement.Spec.PrioritizerPolicy.Configurations { + if config.ScoreCoordinate == nil { + continue + } + if config.ScoreCoordinate.Type != clusterapiv1beta1.ScoreCoordinateTypeAddOn { + continue + } + if config.ScoreCoordinate.AddOn == nil { + continue + } + keys = append(keys, config.ScoreCoordinate.AddOn.ResourceName) + } + + return keys, nil +} + +func indexClusterSetBindingByClusterSet(obj interface{}) ([]string, error) { + binding, ok := obj.(*clusterapiv1beta2.ManagedClusterSetBinding) + if !ok { + return []string{}, fmt.Errorf("obj %T is not a ManagedClusterSetBinding", obj) + } + + return []string{binding.Spec.ClusterSet}, nil +} diff --git a/pkg/controllers/scheduling/enqueue_test.go b/pkg/controllers/scheduling/enqueue_test.go new file mode 100644 index 00000000..f9fb6c42 --- /dev/null +++ b/pkg/controllers/scheduling/enqueue_test.go @@ -0,0 +1,393 @@ +package scheduling + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" + clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" + clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions" + clusterapiv1 "open-cluster-management.io/api/cluster/v1" + clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1" + clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2" + testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing" + "strings" + "testing" + "time" +) + +func newClusterInformerFactory(clusterClient clusterclient.Interface, objects ...runtime.Object) clusterinformers.SharedInformerFactory { + clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10) + + clusterInformerFactory.Cluster().V1beta1().Placements().Informer().AddIndexers(cache.Indexers{ + placementsByScore: indexPlacementsByScore, + placementsByClusterSetBinding: indexPlacementByClusterSetBinding, + }) + + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Informer().AddIndexers(cache.Indexers{ + clustersetBindingsByClusterSet: indexClusterSetBindingByClusterSet, + }) + + clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore() + clusterSetStore := clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Informer().GetStore() + clusterSetBindingStore := clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Informer().GetStore() + placementStore := clusterInformerFactory.Cluster().V1beta1().Placements().Informer().GetStore() + placementDecisionStore := clusterInformerFactory.Cluster().V1beta1().PlacementDecisions().Informer().GetStore() + addOnPlacementStore := clusterInformerFactory.Cluster().V1alpha1().AddOnPlacementScores().Informer().GetStore() + + for _, obj := range objects { + switch obj.(type) { + case *clusterapiv1.ManagedCluster: + clusterStore.Add(obj) + case *clusterapiv1beta2.ManagedClusterSet: + clusterSetStore.Add(obj) + case *clusterapiv1beta2.ManagedClusterSetBinding: + clusterSetBindingStore.Add(obj) + case *clusterapiv1beta1.Placement: + placementStore.Add(obj) + case *clusterapiv1beta1.PlacementDecision: + placementDecisionStore.Add(obj) + case *clusterapiv1alpha1.AddOnPlacementScore: + addOnPlacementStore.Add(obj) + } + } + + return clusterInformerFactory +} + +func TestEnqueuePlacementsByClusterSet(t *testing.T) { + cases := []struct { + name string + clusterSet interface{} + initObjs []runtime.Object + queuedKeys []string + }{ + { + name: "enqueue placements in a namespace", + clusterSet: testinghelpers.NewClusterSet("clusterset1").Build(), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewClusterSet("clusterset2").Build(), + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewClusterSetBinding("ns1", "clusterset2"), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "invalid object type", + clusterSet: "invalid object type", + }, + { + name: "clusterset selector type is LegacyClusterSetLabel", + clusterSet: testinghelpers.NewClusterSet("clusterset1").Build(), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "clusterset selector type is LabelSelector", + clusterSet: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{ + SelectorType: clusterapiv1beta2.LabelSelector, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "cloud": "Amazon", + }, + }, + }).Build(), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "clusterset selector type is LegacyClusterSetLabel", + clusterSet: testinghelpers.NewClusterSet("clusterset1").Build(), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "clusterset selector type is LabelSelector", + clusterSet: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{ + SelectorType: clusterapiv1beta2.LabelSelector, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "cloud": "Amazon", + }, + }, + }).Build(), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "tombstone", + clusterSet: cache.DeletedFinalStateUnknown{ + Key: "clusterset1", + Obj: testinghelpers.NewClusterSet("clusterset1").Build(), + }, + initObjs: []runtime.Object{ + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "tombstone with invalid object type", + clusterSet: cache.DeletedFinalStateUnknown{ + Obj: "invalid object type", + }, + initObjs: []runtime.Object{ + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) + + syncCtx := testinghelpers.NewFakeSyncContext(t, "fake") + q := newEnqueuer( + syncCtx.Queue(), + clusterInformerFactory.Cluster().V1().ManagedClusters(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(), + clusterInformerFactory.Cluster().V1beta1().Placements(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(), + ) + queuedKeys := sets.NewString() + fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + queuedKeys.Insert(key) + } + q.enqueuePlacementFunc = fakeEnqueuePlacement + q.enqueueClusterSet(c.clusterSet) + + expectedQueuedKeys := sets.NewString(c.queuedKeys...) + if !queuedKeys.Equal(expectedQueuedKeys) { + t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) + } + }) + } +} + +func TestEnqueuePlacementsByClusterSetBinding(t *testing.T) { + cases := []struct { + name string + namespace string + clusterSetBinding interface{} + initObjs []runtime.Object + queuedKeys []string + }{ + { + name: "enqueue placements by clusterSetBinding", + namespace: "ns1", + clusterSetBinding: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + initObjs: []runtime.Object{ + testinghelpers.NewPlacement("ns1", "placement1").Build(), + testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(), + testinghelpers.NewPlacement("ns2", "placement3").WithClusterSets("clusterset1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "invalid resource type", + clusterSetBinding: "invalid resource type", + initObjs: []runtime.Object{ + testinghelpers.NewPlacement("ns1", "placement1").Build(), + testinghelpers.NewPlacement("ns2", "placement2").Build(), + }, + }, + { + name: "on clustersetbinding change", + clusterSetBinding: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + testinghelpers.NewPlacement("ns2", "placement2").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "clusterset", + clusterSetBinding: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "tombstone", + clusterSetBinding: cache.DeletedFinalStateUnknown{ + Key: "ns1/clusterset1", + Obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + }, + initObjs: []runtime.Object{ + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + { + name: "tombstone with invalid object type", + clusterSetBinding: cache.DeletedFinalStateUnknown{ + Obj: "invalid object type", + }, + initObjs: []runtime.Object{ + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewPlacement("ns1", "placement1").Build(), + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) + + syncCtx := testinghelpers.NewFakeSyncContext(t, "fake") + q := newEnqueuer( + syncCtx.Queue(), + clusterInformerFactory.Cluster().V1().ManagedClusters(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(), + clusterInformerFactory.Cluster().V1beta1().Placements(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(), + ) + queuedKeys := sets.NewString() + fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + queuedKeys.Insert(key) + } + q.enqueuePlacementFunc = fakeEnqueuePlacement + q.enqueueClusterSetBinding(c.clusterSetBinding) + + expectedQueuedKeys := sets.NewString(c.queuedKeys...) + if !queuedKeys.Equal(expectedQueuedKeys) { + t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) + } + }) + } +} + +func TestEnqueuePlacementsByScore(t *testing.T) { + cases := []struct { + name string + namespace string + score interface{} + initObjs []runtime.Object + queuedKeys []string + }{ + { + name: "ensueue score", + score: testinghelpers.NewAddOnPlacementScore("cluster1", "score1").Build(), + initObjs: []runtime.Object{ + testinghelpers.NewPlacement("ns1", "placement1").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(), + testinghelpers.NewPlacement("ns2", "placement2").WithScoreCoordinateAddOn("score2", "cpu", 1).Build(), + testinghelpers.NewPlacement("ns3", "placement3").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(), + testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, "clusterset1").Build(), + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + testinghelpers.NewClusterSetBinding("ns3", "clusterset1"), + }, + queuedKeys: []string{ + "ns1/placement1", + "ns3/placement3", + }, + }, + { + name: "only enqueue score with filtered placement", + score: testinghelpers.NewAddOnPlacementScore("cluster1", "score1").Build(), + initObjs: []runtime.Object{ + testinghelpers.NewPlacement("ns1", "placement1").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(), + testinghelpers.NewPlacement("ns2", "placement2").WithScoreCoordinateAddOn("score2", "cpu", 1).Build(), + testinghelpers.NewPlacement("ns3", "placement3").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(), + testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, "clusterset1").Build(), + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewClusterSetBinding("ns1", "clusterset2"), + testinghelpers.NewClusterSetBinding("ns3", "clusterset1"), + }, + queuedKeys: []string{ + "ns3/placement3", + }, + }, + { + name: "tombstone", + score: cache.DeletedFinalStateUnknown{ + Key: "cluster1/score1", + Obj: testinghelpers.NewAddOnPlacementScore("cluster1", "score1").Build(), + }, + initObjs: []runtime.Object{ + testinghelpers.NewPlacement("ns1", "placement1").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(), + testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, "clusterset1").Build(), + testinghelpers.NewClusterSet("clusterset1").Build(), + testinghelpers.NewClusterSetBinding("ns1", "clusterset1"), + }, + queuedKeys: []string{ + "ns1/placement1", + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) + + syncCtx := testinghelpers.NewFakeSyncContext(t, "fake") + q := newEnqueuer( + syncCtx.Queue(), + clusterInformerFactory.Cluster().V1().ManagedClusters(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(), + clusterInformerFactory.Cluster().V1beta1().Placements(), + clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(), + ) + queuedKeys := sets.NewString() + fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + queuedKeys.Insert(key) + } + q.enqueuePlacementFunc = fakeEnqueuePlacement + q.enqueuePlacementScore(c.score) + + expectedQueuedKeys := sets.NewString(c.queuedKeys...) + if !queuedKeys.Equal(expectedQueuedKeys) { + t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ",")) + } + }) + } +} diff --git a/pkg/controllers/scheduling/scheduling_controller.go b/pkg/controllers/scheduling/scheduling_controller.go index 14ee37b0..6e5ef156 100644 --- a/pkg/controllers/scheduling/scheduling_controller.go +++ b/pkg/controllers/scheduling/scheduling_controller.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog/v2" clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" + clusterinformerv1alpha1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1" clusterinformerv1beta1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta1" clusterinformerv1beta2 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta2" @@ -60,7 +61,6 @@ type schedulingController struct { clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister placementLister clusterlisterv1beta1.PlacementLister placementDecisionLister clusterlisterv1beta1.PlacementDecisionLister - enqueuePlacementFunc enqueuePlacementFunc scheduler Scheduler recorder kevents.EventRecorder } @@ -73,13 +73,13 @@ func NewSchedulingController( clusterSetBindingInformer clusterinformerv1beta2.ManagedClusterSetBindingInformer, placementInformer clusterinformerv1beta1.PlacementInformer, placementDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer, + placementScoreInformer clusterinformerv1alpha1.AddOnPlacementScoreInformer, scheduler Scheduler, recorder events.Recorder, krecorder kevents.EventRecorder, ) factory.Controller { syncCtx := factory.NewSyncContext(schedulingControllerName, recorder) - enqueuePlacementFunc := func(namespace, name string) { - syncCtx.Queue().Add(fmt.Sprintf("%s/%s", namespace, name)) - } + + enQueuer := newEnqueuer(syncCtx.Queue(), clusterInformer, clusterSetInformer, placementInformer, clusterSetBindingInformer) // build controller c := &schedulingController{ @@ -89,7 +89,6 @@ func NewSchedulingController( clusterSetBindingLister: clusterSetBindingInformer.Lister(), placementLister: placementInformer.Lister(), placementDecisionLister: placementDecisionInformer.Lister(), - enqueuePlacementFunc: enqueuePlacementFunc, recorder: krecorder, scheduler: scheduler, } @@ -101,10 +100,7 @@ func NewSchedulingController( // controller booting. But that should not cause any problem because all existing // placements will be enqueued by the controller anyway when booting. clusterInformer.Informer().AddEventHandler(&clusterEventHandler{ - clusterSetLister: clusterSetInformer.Lister(), - clusterSetBindingLister: clusterSetBindingInformer.Lister(), - placementLister: placementInformer.Lister(), - enqueuePlacementFunc: enqueuePlacementFunc, + enqueuer: enQueuer, }) // setup event handler for clusterset informer @@ -113,10 +109,12 @@ func NewSchedulingController( // informers/listers of clustersetbinding/placement are synced during controller // booting. But that should not cause any problem because all existing placements will // be enqueued by the controller anyway when booting. - clusterSetInformer.Informer().AddEventHandler(&clusterSetEventHandler{ - clusterSetBindingLister: clusterSetBindingInformer.Lister(), - placementLister: placementInformer.Lister(), - enqueuePlacementFunc: enqueuePlacementFunc, + clusterSetInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{ + AddFunc: enQueuer.enqueueClusterSet, + UpdateFunc: func(oldObj, newObj interface{}) { + enQueuer.enqueueClusterSet(newObj) + }, + DeleteFunc: enQueuer.enqueueClusterSet, }) // setup event handler for clustersetbinding informer @@ -125,10 +123,21 @@ func NewSchedulingController( // the informers/listers of clusterset/placement are synced during controller booting. But // that should not cause any problem because all existing placements will be enqueued by // the controller anyway when booting. - clusterSetBindingInformer.Informer().AddEventHandler(&clusterSetBindingEventHandler{ - clusterSetLister: clusterSetInformer.Lister(), - placementLister: placementInformer.Lister(), - enqueuePlacementFunc: enqueuePlacementFunc, + clusterSetBindingInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{ + AddFunc: enQueuer.enqueueClusterSetBinding, + UpdateFunc: func(oldObj, newObj interface{}) { + enQueuer.enqueueClusterSetBinding(newObj) + }, + DeleteFunc: enQueuer.enqueueClusterSetBinding, + }) + + // setup event handler for placementscore informer + placementScoreInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{ + AddFunc: enQueuer.enqueuePlacementScore, + UpdateFunc: func(oldObj, newObj interface{}) { + enQueuer.enqueuePlacementScore(newObj) + }, + DeleteFunc: enQueuer.enqueuePlacementScore, }) return factory.New(). @@ -153,84 +162,11 @@ func NewSchedulingController( } return false }, placementDecisionInformer.Informer()). - WithBareInformers(clusterInformer.Informer(), clusterSetInformer.Informer(), clusterSetBindingInformer.Informer()). + WithBareInformers(clusterInformer.Informer(), clusterSetInformer.Informer(), clusterSetBindingInformer.Informer(), placementScoreInformer.Informer()). WithSync(c.sync). ToController(schedulingControllerName, recorder) } -func NewSchedulingControllerResync( - clusterClient clusterclient.Interface, - clusterInformer clusterinformerv1.ManagedClusterInformer, - clusterSetInformer clusterinformerv1beta2.ManagedClusterSetInformer, - clusterSetBindingInformer clusterinformerv1beta2.ManagedClusterSetBindingInformer, - placementInformer clusterinformerv1beta1.PlacementInformer, - placementDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer, - scheduler Scheduler, - recorder events.Recorder, krecorder kevents.EventRecorder, -) factory.Controller { - syncCtx := factory.NewSyncContext(schedulingControllerResyncName, recorder) - enqueuePlacementFunc := func(namespace, name string) { - syncCtx.Queue().Add(fmt.Sprintf("%s/%s", namespace, name)) - } - - // build controller - c := &schedulingController{ - clusterClient: clusterClient, - clusterLister: clusterInformer.Lister(), - clusterSetLister: clusterSetInformer.Lister(), - clusterSetBindingLister: clusterSetBindingInformer.Lister(), - placementLister: placementInformer.Lister(), - placementDecisionLister: placementDecisionInformer.Lister(), - enqueuePlacementFunc: enqueuePlacementFunc, - recorder: krecorder, - scheduler: scheduler, - } - - return factory.New(). - WithSyncContext(syncCtx). - WithSync(c.resync). - ResyncEvery(ResyncInterval). - ToController(schedulingControllerResyncName, recorder) - -} - -// Resync the placement which depends on AddOnPlacementScore periodically -func (c *schedulingController) resync(ctx context.Context, syncCtx factory.SyncContext) error { - queueKey := syncCtx.QueueKey() - klog.V(4).Infof("Resync placement %q", queueKey) - - if queueKey == "key" { - placements, err := c.placementLister.List(labels.Everything()) - if err != nil { - return err - } - - for _, placement := range placements { - for _, config := range placement.Spec.PrioritizerPolicy.Configurations { - if config.ScoreCoordinate != nil && config.ScoreCoordinate.Type == clusterapiv1beta1.ScoreCoordinateTypeAddOn { - key, _ := cache.MetaNamespaceKeyFunc(placement) - klog.V(4).Infof("Requeue placement %s", key) - syncCtx.Queue().Add(key) - break - } - } - } - - return nil - } else { - placement, err := c.getPlacement(queueKey) - if errors.IsNotFound(err) { - // no work if placement is deleted - return nil - } - if err != nil { - return err - } - // Do not pass syncCtx to syncPlacement, since don't want to requeue the placement when resyncing the placement. - return c.syncPlacement(ctx, nil, placement) - } -} - func (c *schedulingController) sync(ctx context.Context, syncCtx factory.SyncContext) error { queueKey := syncCtx.QueueKey() klog.V(4).Infof("Reconciling placement %q", queueKey) diff --git a/pkg/controllers/scheduling/scheduling_controller_test.go b/pkg/controllers/scheduling/scheduling_controller_test.go index 66fc228b..18778bb1 100644 --- a/pkg/controllers/scheduling/scheduling_controller_test.go +++ b/pkg/controllers/scheduling/scheduling_controller_test.go @@ -201,7 +201,7 @@ func TestSchedulingController_sync(t *testing.T) { t.Run(c.name, func(t *testing.T) { c.initObjs = append(c.initObjs, c.placement) clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) s := &testScheduler{result: c.scheduleResult} ctrl := schedulingController{ @@ -258,7 +258,7 @@ func TestGetValidManagedClusterSetBindings(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) ctrl := &schedulingController{ clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), @@ -333,7 +333,7 @@ func TestGetValidManagedClusterSets(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) ctrl := &schedulingController{ clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), @@ -461,7 +461,7 @@ func TestGetAvailableClusters(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) ctrl := &schedulingController{ clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), @@ -754,7 +754,7 @@ func TestBind(t *testing.T) { }, ) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...) s := &testScheduler{}