diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index a515556dc8..0f19d0d391 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -22,6 +22,7 @@ import ( "k8s.io/klog" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" "sigs.k8s.io/descheduler/cmd/descheduler/app/options" "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/client" @@ -46,28 +47,38 @@ func Run(rs *options.DeschedulerServer) error { return fmt.Errorf("deschedulerPolicy is nil") } - return RunDeschedulerStrategies(rs, deschedulerPolicy) -} - -func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy) error { evictionPolicyGroupVersion, err := eutils.SupportEviction(rs.Client) if err != nil || len(evictionPolicyGroupVersion) == 0 { return err } stopChannel := make(chan struct{}) - nodes, err := nodeutil.ReadyNodes(rs.Client, rs.NodeSelector, stopChannel) - if err != nil { - return err - } + return RunDeschedulerStrategies(rs, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel) +} - if len(nodes) <= 1 { - klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..") - return nil - } +func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, stopChannel chan struct{}) error { + sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0) + nodeInformer := sharedInformerFactory.Core().V1().Nodes() + + sharedInformerFactory.Start(stopChannel) + sharedInformerFactory.WaitForCacheSync(stopChannel) - nodePodCount := utils.InitializeNodePodCount(nodes) wait.Until(func() { + nodes, err := nodeutil.ReadyNodes(rs.Client, nodeInformer, rs.NodeSelector, stopChannel) + if err != nil { + klog.V(1).Infof("Unable to get ready nodes: %v", err) + close(stopChannel) + return + } + + if len(nodes) <= 1 { + klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..") + close(stopChannel) + return + } + + nodePodCount := utils.InitializeNodePodCount(nodes) + strategies.RemoveDuplicatePods(rs, deschedulerPolicy.Strategies["RemoveDuplicates"], evictionPolicyGroupVersion, nodes, nodePodCount) strategies.LowNodeUtilization(rs, deschedulerPolicy.Strategies["LowNodeUtilization"], evictionPolicyGroupVersion, nodes, nodePodCount) strategies.RemovePodsViolatingInterPodAntiAffinity(rs, deschedulerPolicy.Strategies["RemovePodsViolatingInterPodAntiAffinity"], evictionPolicyGroupVersion, nodes, nodePodCount) diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go new file mode 100644 index 0000000000..150df4058e --- /dev/null +++ b/pkg/descheduler/descheduler_test.go @@ -0,0 +1,92 @@ +package descheduler + +import ( + "fmt" + "strings" + "testing" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/descheduler/cmd/descheduler/app/options" + "sigs.k8s.io/descheduler/pkg/api" + "sigs.k8s.io/descheduler/test" +) + +func TestTaintsUpdated(t *testing.T) { + n1 := test.BuildTestNode("n1", 2000, 3000, 10) + n2 := test.BuildTestNode("n2", 2000, 3000, 10) + + p1 := test.BuildTestPod(fmt.Sprintf("pod_1_%s", n1.Name), 200, 0, n1.Name) + p1.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + {}, + } + + client := fakeclientset.NewSimpleClientset(n1, n2, p1) + dp := &api.DeschedulerPolicy{ + Strategies: api.StrategyList{ + "RemovePodsViolatingNodeTaints": api.DeschedulerStrategy{ + Enabled: true, + }, + }, + } + + stopChannel := make(chan struct{}) + defer close(stopChannel) + + rs := options.NewDeschedulerServer() + rs.Client = client + rs.DeschedulingInterval = 100 * time.Millisecond + go func() { + err := RunDeschedulerStrategies(rs, dp, "v1beta1", stopChannel) + if err != nil { + t.Fatalf("Unable to run descheduler strategies: %v", err) + } + }() + + // Wait for few cycles and then verify the only pod still exists + time.Sleep(300 * time.Millisecond) + pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{}) + if err != nil { + t.Errorf("Unable to list pods: %v", err) + } + if len(pods.Items) < 1 { + t.Errorf("The pod was evicted before a node was tained") + } + + n1WithTaint := n1.DeepCopy() + n1WithTaint.Spec.Taints = []v1.Taint{ + { + Key: "key", + Value: "value", + Effect: v1.TaintEffectNoSchedule, + }, + } + + if _, err := client.CoreV1().Nodes().Update(n1WithTaint); err != nil { + t.Fatalf("Unable to update node: %v\n", err) + } + + if err := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + // Get over evicted pod result in panic + //pods, err := client.CoreV1().Pods(p1.Namespace).Get(p1.Name, metav1.GetOptions{}) + // List is better, it does not panic. + // Though once the pod is evicted, List starts to error with "can't assign or convert v1beta1.Eviction into v1.Pod" + pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{}) + if err == nil { + if len(pods.Items) > 0 { + return false, nil + } + return true, nil + } + if strings.Contains(err.Error(), "can't assign or convert v1beta1.Eviction into v1.Pod") { + return true, nil + } + + return false, nil + }); err != nil { + t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies") + } +} diff --git a/pkg/descheduler/node/node.go b/pkg/descheduler/node/node.go index 33d9ec5c9a..fcf8f1d54e 100644 --- a/pkg/descheduler/node/node.go +++ b/pkg/descheduler/node/node.go @@ -17,34 +17,27 @@ limitations under the License. package node import ( - "time" - "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/klog" "sigs.k8s.io/descheduler/pkg/utils" ) // ReadyNodes returns ready nodes irrespective of whether they are // schedulable or not. -func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) { +func ReadyNodes(client clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) { ns, err := labels.Parse(nodeSelector) if err != nil { return []*v1.Node{}, err } var nodes []*v1.Node - nl := GetNodeLister(client, stopChannel) - if nl != nil { - // err is defined above - if nodes, err = nl.List(ns); err != nil { - return []*v1.Node{}, err - } + // err is defined above + if nodes, err = nodeInformer.Lister().List(ns); err != nil { + return []*v1.Node{}, err } if len(nodes) == 0 { @@ -74,22 +67,6 @@ func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-c return readyNodes, nil } -func GetNodeLister(client clientset.Interface, stopChannel <-chan struct{}) corelisters.NodeLister { - if stopChannel == nil { - return nil - } - listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything()) - store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - nodeLister := corelisters.NewNodeLister(store) - reflector := cache.NewReflector(listWatcher, &v1.Node{}, store, time.Hour) - go reflector.Run(stopChannel) - - // To give some time so that listing works, chosen randomly - time.Sleep(100 * time.Millisecond) - - return nodeLister -} - // IsReady checks if the descheduler could run against given node. func IsReady(node *v1.Node) bool { for i := range node.Status.Conditions { diff --git a/pkg/descheduler/node/node_test.go b/pkg/descheduler/node/node_test.go index 88bc522f55..2502eea961 100644 --- a/pkg/descheduler/node/node_test.go +++ b/pkg/descheduler/node/node_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "sigs.k8s.io/descheduler/test" ) @@ -62,7 +63,16 @@ func TestReadyNodesWithNodeSelector(t *testing.T) { fakeClient := fake.NewSimpleClientset(node1, node2) nodeSelector := "type=compute" - nodes, _ := ReadyNodes(fakeClient, nodeSelector, nil) + + sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + nodeInformer := sharedInformerFactory.Core().V1().Nodes() + + stopChannel := make(chan struct{}, 0) + sharedInformerFactory.Start(stopChannel) + sharedInformerFactory.WaitForCacheSync(stopChannel) + defer close(stopChannel) + + nodes, _ := ReadyNodes(fakeClient, nodeInformer, nodeSelector, nil) if nodes[0].Name != "node1" { t.Errorf("Expected node1, got %s", nodes[0].Name) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5edf7ca417..4354c71329 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -24,6 +24,8 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog" "sigs.k8s.io/descheduler/cmd/descheduler/app/options" @@ -93,7 +95,7 @@ func RcByNameContainer(name string, replicas int32, labels map[string]string, gr } // startEndToEndForLowNodeUtilization tests the lownode utilization strategy. -func startEndToEndForLowNodeUtilization(clientset clientset.Interface) { +func startEndToEndForLowNodeUtilization(clientset clientset.Interface, nodeInformer coreinformers.NodeInformer) { var thresholds = make(deschedulerapi.ResourceThresholds) var targetThresholds = make(deschedulerapi.ResourceThresholds) thresholds[v1.ResourceMemory] = 20 @@ -108,7 +110,7 @@ func startEndToEndForLowNodeUtilization(clientset clientset.Interface) { klog.Fatalf("%v", err) } stopChannel := make(chan struct{}) - nodes, err := nodeutil.ReadyNodes(clientset, "", stopChannel) + nodes, err := nodeutil.ReadyNodes(clientset, nodeInformer, "", stopChannel) if err != nil { klog.Fatalf("%v", err) } @@ -132,6 +134,14 @@ func TestE2E(t *testing.T) { if err != nil { t.Errorf("Error listing node with %v", err) } + sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0) + nodeInformer := sharedInformerFactory.Core().V1().Nodes() + + stopChannel := make(chan struct{}, 0) + sharedInformerFactory.Start(stopChannel) + sharedInformerFactory.WaitForCacheSync(stopChannel) + defer close(stopChannel) + // Assumption: We would have 3 node cluster by now. Kubeadm brings all the master components onto master node. // So, the last node would have least utilization. rc := RcByNameContainer("test-rc", int32(15), map[string]string{"test": "app"}, nil) @@ -139,7 +149,7 @@ func TestE2E(t *testing.T) { if err != nil { t.Errorf("Error creating deployment %v", err) } - evictPods(t, clientSet, nodeList, rc) + evictPods(t, clientSet, nodeInformer, nodeList, rc) rc.Spec.Template.Annotations = map[string]string{"descheduler.alpha.kubernetes.io/evict": "true"} rc.Spec.Replicas = func(i int32) *int32 { return &i }(15) @@ -156,7 +166,7 @@ func TestE2E(t *testing.T) { if err != nil { t.Errorf("Error creating deployment %v", err) } - evictPods(t, clientSet, nodeList, rc) + evictPods(t, clientSet, nodeInformer, nodeList, rc) } func TestDeschedulingInterval(t *testing.T) { @@ -173,8 +183,13 @@ func TestDeschedulingInterval(t *testing.T) { c := make(chan bool) go func() { - err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy) - if err != nil { + evictionPolicyGroupVersion, err := eutils.SupportEviction(s.Client) + if err != nil || len(evictionPolicyGroupVersion) == 0 { + t.Errorf("Error when checking support for eviction: %v", err) + } + + stopChannel := make(chan struct{}) + if err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel); err != nil { t.Errorf("Error running descheduler strategies: %+v", err) } c <- true @@ -188,7 +203,7 @@ func TestDeschedulingInterval(t *testing.T) { } } -func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeList, rc *v1.ReplicationController) { +func evictPods(t *testing.T, clientSet clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeList *v1.NodeList, rc *v1.ReplicationController) { var leastLoadedNode v1.Node podsBefore := math.MaxInt16 for i := range nodeList.Items { @@ -208,7 +223,7 @@ func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeLis } } t.Log("Eviction of pods starting") - startEndToEndForLowNodeUtilization(clientSet) + startEndToEndForLowNodeUtilization(clientSet, nodeInformer) podsOnleastUtilizedNode, err := podutil.ListEvictablePodsOnNode(clientSet, &leastLoadedNode, true) if err != nil { t.Errorf("Error listing pods on a node %v", err)