Skip to content
This repository has been archived by the owner on Oct 18, 2024. It is now read-only.

Commit

Permalink
watch placementscore during scheduling (#89)
Browse files Browse the repository at this point in the history
* watch placementscore during scheduling

refactor eventhandler using index

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Filter placement when enqueue score

Signed-off-by: Jian Qiu <jqiu@redhat.com>

Signed-off-by: Jian Qiu <jqiu@redhat.com>
  • Loading branch information
qiujian16 authored Nov 29, 2022
1 parent fafcfbf commit 5d33dd9
Show file tree
Hide file tree
Showing 11 changed files with 729 additions and 796 deletions.
13 changes: 1 addition & 12 deletions pkg/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,14 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
clusterInformers.Cluster().V1beta2().ManagedClusterSetBindings(),
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
scheduler,
controllerContext.EventRecorder, recorder,
)

schedulingControllerResync := scheduling.NewSchedulingControllerResync(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
clusterInformers.Cluster().V1beta2().ManagedClusterSets(),
clusterInformers.Cluster().V1beta2().ManagedClusterSetBindings(),
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
clusterInformers.Cluster().V1alpha1().AddOnPlacementScores(),
scheduler,
controllerContext.EventRecorder, recorder,
)

go clusterInformers.Start(ctx.Done())

go schedulingController.Run(ctx, 1)
go schedulingControllerResync.Run(ctx, 1)

<-ctx.Done()
return nil
Expand Down
69 changes: 6 additions & 63 deletions pkg/controllers/scheduling/cluster_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,25 @@ import (
"fmt"
"reflect"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
cache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2"

clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
)

type clusterEventHandler struct {
clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister
clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister
placementLister clusterlisterv1beta1.PlacementLister
enqueuePlacementFunc enqueuePlacementFunc
enqueuer *enqueuer
}

func (h *clusterEventHandler) OnAdd(obj interface{}) {
h.onChange(obj)
h.enqueuer.enqueueCluster(obj)
}

func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) {
newCluster, ok := newObj.(*clusterapiv1.ManagedCluster)
if !ok {
return
}
h.onChange(newObj)
h.enqueuer.enqueueCluster(newObj)

if oldObj == nil {
return
Expand All @@ -43,65 +34,17 @@ func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) {

// if the cluster labels changes, process the original clusterset
if !reflect.DeepEqual(newCluster.Labels, oldCluster.Labels) {
h.onChange(oldCluster)
h.enqueuer.enqueueCluster(oldCluster)
}
}

func (h *clusterEventHandler) OnDelete(obj interface{}) {
switch t := obj.(type) {
case *clusterapiv1.ManagedCluster:
h.onChange(obj)
h.enqueuer.enqueueCluster(obj)
case cache.DeletedFinalStateUnknown:
h.onChange(t.Obj)
h.enqueuer.enqueueCluster(t.Obj)
default:
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
}
}

func (h *clusterEventHandler) onChange(obj interface{}) {
cluster, ok := obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}

clusterSetNames, err := h.getClusterSetNames(cluster)
if err != nil {
klog.V(4).Infof("Unable to get clusterset of cluster %q: %v", cluster.GetName(), err)
return
}

// skip cluster belongs to no clusterset
if len(clusterSetNames) == 0 {
return
}

for _, clusterSetName := range clusterSetNames {
// enqueue placements which might be impacted
err = enqueuePlacementsByClusterSet(
clusterSetName,
h.clusterSetBindingLister,
h.placementLister,
h.enqueuePlacementFunc,
)
if err != nil {
klog.Errorf("Unable to enqueue placements with access to clusterset %q: %v", clusterSetName, err)
}
}
}

// getClusterSetName returns the name of the clusterset the cluster belongs to. It also checks the existence
// of the clusterset.
func (h *clusterEventHandler) getClusterSetNames(cluster metav1.Object) ([]string, error) {
clusterSetNames := []string{}
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster.(*clusterapiv1.ManagedCluster), h.clusterSetLister)
if err != nil {
return clusterSetNames, err
}

for _, cs := range clusterSets {
clusterSetNames = append(clusterSetNames, cs.Name)
}

return clusterSetNames, nil
}
69 changes: 45 additions & 24 deletions pkg/controllers/scheduling/cluster_event_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package scheduling

import (
"fmt"
"k8s.io/client-go/util/workqueue"
"strings"
"testing"

Expand Down Expand Up @@ -93,19 +93,24 @@ func TestOnClusterChange(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)

syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
handler := &clusterEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement

handler.onChange(c.obj)
q.enqueueCluster(c.obj)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
Expand Down Expand Up @@ -247,16 +252,24 @@ func TestOnClusterUpdate(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)

syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
handler := &clusterEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
enqueuer: q,
}

handler.OnUpdate(c.oldObj, c.newObj)
Expand Down Expand Up @@ -339,16 +352,24 @@ func TestOnClusterDelete(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)

syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
handler := &clusterEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
enqueuer: q,
}

handler.OnDelete(c.obj)
Expand Down
97 changes: 0 additions & 97 deletions pkg/controllers/scheduling/clusterset_event_handler.go

This file was deleted.

Loading

0 comments on commit 5d33dd9

Please sign in to comment.