Skip to content

Commit

Permalink
WIP: List nodes through informer in every iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvagabund committed Mar 3, 2020
1 parent df51018 commit 31de11f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 38 deletions.
30 changes: 21 additions & 9 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/klog"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
Expand Down Expand Up @@ -56,18 +57,29 @@ func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *
}

stopChannel := make(chan struct{})
nodes, err := nodeutil.ReadyNodes(rs.Client, rs.NodeSelector, stopChannel)
if err != nil {
return err
}

if len(nodes) <= 1 {
klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
return nil
}
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)

nodePodCount := utils.InitializeNodePodCount(nodes)
wait.Until(func() {
nodes, err := nodeutil.ReadyNodes(rs.Client, nodeInformer, rs.NodeSelector, stopChannel)
if err != nil {
klog.V(1).Infof("Unable to get ready nodes: %v", err)
close(stopChannel)
return
}

if len(nodes) <= 1 {
klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
close(stopChannel)
return
}

nodePodCount := utils.InitializeNodePodCount(nodes)

strategies.RemoveDuplicatePods(rs, deschedulerPolicy.Strategies["RemoveDuplicates"], evictionPolicyGroupVersion, nodes, nodePodCount)
strategies.LowNodeUtilization(rs, deschedulerPolicy.Strategies["LowNodeUtilization"], evictionPolicyGroupVersion, nodes, nodePodCount)
strategies.RemovePodsViolatingInterPodAntiAffinity(rs, deschedulerPolicy.Strategies["RemovePodsViolatingInterPodAntiAffinity"], evictionPolicyGroupVersion, nodes, nodePodCount)
Expand Down
33 changes: 5 additions & 28 deletions pkg/descheduler/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,27 @@ limitations under the License.
package node

import (
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"sigs.k8s.io/descheduler/pkg/utils"
)

// ReadyNodes returns ready nodes irrespective of whether they are
// schedulable or not.
func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
func ReadyNodes(client clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
ns, err := labels.Parse(nodeSelector)
if err != nil {
return []*v1.Node{}, err
}

var nodes []*v1.Node
nl := GetNodeLister(client, stopChannel)
if nl != nil {
// err is defined above
if nodes, err = nl.List(ns); err != nil {
return []*v1.Node{}, err
}
// err is defined above
if nodes, err = nodeInformer.Lister().List(ns); err != nil {
return []*v1.Node{}, err
}

if len(nodes) == 0 {
Expand Down Expand Up @@ -74,22 +67,6 @@ func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-c
return readyNodes, nil
}

func GetNodeLister(client clientset.Interface, stopChannel <-chan struct{}) corelisters.NodeLister {
if stopChannel == nil {
return nil
}
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
nodeLister := corelisters.NewNodeLister(store)
reflector := cache.NewReflector(listWatcher, &v1.Node{}, store, time.Hour)
go reflector.Run(stopChannel)

// To give some time so that listing works, chosen randomly
time.Sleep(100 * time.Millisecond)

return nodeLister
}

// IsReady checks if the descheduler could run against given node.
func IsReady(node *v1.Node) bool {
for i := range node.Status.Conditions {
Expand Down
7 changes: 6 additions & 1 deletion pkg/descheduler/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/descheduler/test"
)
Expand Down Expand Up @@ -62,7 +63,11 @@ func TestReadyNodesWithNodeSelector(t *testing.T) {

fakeClient := fake.NewSimpleClientset(node1, node2)
nodeSelector := "type=compute"
nodes, _ := ReadyNodes(fakeClient, nodeSelector, nil)

sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

nodes, _ := ReadyNodes(fakeClient, nodeInformer, nodeSelector, nil)

if nodes[0].Name != "node1" {
t.Errorf("Expected node1, got %s", nodes[0].Name)
Expand Down

0 comments on commit 31de11f

Please sign in to comment.