Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#249 from ingvagabund/list-node-thr…
Browse files Browse the repository at this point in the history
…ough-informer-in-every-iteration

List nodes through informer in every iteration
  • Loading branch information
(Brien Dieterle) committed Mar 4, 2020
2 parents bb39968 + f69ffb4 commit 274bb7a
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 50 deletions.
37 changes: 24 additions & 13 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/klog"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
Expand All @@ -46,28 +47,38 @@ func Run(rs *options.DeschedulerServer) error {
return fmt.Errorf("deschedulerPolicy is nil")
}

return RunDeschedulerStrategies(rs, deschedulerPolicy)
}

func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy) error {
evictionPolicyGroupVersion, err := eutils.SupportEviction(rs.Client)
if err != nil || len(evictionPolicyGroupVersion) == 0 {
return err
}

stopChannel := make(chan struct{})
nodes, err := nodeutil.ReadyNodes(rs.Client, rs.NodeSelector, stopChannel)
if err != nil {
return err
}
return RunDeschedulerStrategies(rs, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel)
}

if len(nodes) <= 1 {
klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
return nil
}
func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, stopChannel chan struct{}) error {
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)

nodePodCount := utils.InitializeNodePodCount(nodes)
wait.Until(func() {
nodes, err := nodeutil.ReadyNodes(rs.Client, nodeInformer, rs.NodeSelector, stopChannel)
if err != nil {
klog.V(1).Infof("Unable to get ready nodes: %v", err)
close(stopChannel)
return
}

if len(nodes) <= 1 {
klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
close(stopChannel)
return
}

nodePodCount := utils.InitializeNodePodCount(nodes)

strategies.RemoveDuplicatePods(rs, deschedulerPolicy.Strategies["RemoveDuplicates"], evictionPolicyGroupVersion, nodes, nodePodCount)
strategies.LowNodeUtilization(rs, deschedulerPolicy.Strategies["LowNodeUtilization"], evictionPolicyGroupVersion, nodes, nodePodCount)
strategies.RemovePodsViolatingInterPodAntiAffinity(rs, deschedulerPolicy.Strategies["RemovePodsViolatingInterPodAntiAffinity"], evictionPolicyGroupVersion, nodes, nodePodCount)
Expand Down
92 changes: 92 additions & 0 deletions pkg/descheduler/descheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package descheduler

import (
"fmt"
"strings"
"testing"
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/test"
)

func TestTaintsUpdated(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10)
n2 := test.BuildTestNode("n2", 2000, 3000, 10)

p1 := test.BuildTestPod(fmt.Sprintf("pod_1_%s", n1.Name), 200, 0, n1.Name)
p1.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
{},
}

client := fakeclientset.NewSimpleClientset(n1, n2, p1)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{
"RemovePodsViolatingNodeTaints": api.DeschedulerStrategy{
Enabled: true,
},
},
}

stopChannel := make(chan struct{})
defer close(stopChannel)

rs := options.NewDeschedulerServer()
rs.Client = client
rs.DeschedulingInterval = 100 * time.Millisecond
go func() {
err := RunDeschedulerStrategies(rs, dp, "v1beta1", stopChannel)
if err != nil {
t.Fatalf("Unable to run descheduler strategies: %v", err)
}
}()

// Wait for few cycles and then verify the only pod still exists
time.Sleep(300 * time.Millisecond)
pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unable to list pods: %v", err)
}
if len(pods.Items) < 1 {
t.Errorf("The pod was evicted before a node was tained")
}

n1WithTaint := n1.DeepCopy()
n1WithTaint.Spec.Taints = []v1.Taint{
{
Key: "key",
Value: "value",
Effect: v1.TaintEffectNoSchedule,
},
}

if _, err := client.CoreV1().Nodes().Update(n1WithTaint); err != nil {
t.Fatalf("Unable to update node: %v\n", err)
}

if err := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
// Get over evicted pod result in panic
//pods, err := client.CoreV1().Pods(p1.Namespace).Get(p1.Name, metav1.GetOptions{})
// List is better, it does not panic.
// Though once the pod is evicted, List starts to error with "can't assign or convert v1beta1.Eviction into v1.Pod"
pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{})
if err == nil {
if len(pods.Items) > 0 {
return false, nil
}
return true, nil
}
if strings.Contains(err.Error(), "can't assign or convert v1beta1.Eviction into v1.Pod") {
return true, nil
}

return false, nil
}); err != nil {
t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies")
}
}
33 changes: 5 additions & 28 deletions pkg/descheduler/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,27 @@ limitations under the License.
package node

import (
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"sigs.k8s.io/descheduler/pkg/utils"
)

// ReadyNodes returns ready nodes irrespective of whether they are
// schedulable or not.
func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
func ReadyNodes(client clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
ns, err := labels.Parse(nodeSelector)
if err != nil {
return []*v1.Node{}, err
}

var nodes []*v1.Node
nl := GetNodeLister(client, stopChannel)
if nl != nil {
// err is defined above
if nodes, err = nl.List(ns); err != nil {
return []*v1.Node{}, err
}
// err is defined above
if nodes, err = nodeInformer.Lister().List(ns); err != nil {
return []*v1.Node{}, err
}

if len(nodes) == 0 {
Expand Down Expand Up @@ -74,22 +67,6 @@ func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-c
return readyNodes, nil
}

func GetNodeLister(client clientset.Interface, stopChannel <-chan struct{}) corelisters.NodeLister {
if stopChannel == nil {
return nil
}
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
nodeLister := corelisters.NewNodeLister(store)
reflector := cache.NewReflector(listWatcher, &v1.Node{}, store, time.Hour)
go reflector.Run(stopChannel)

// To give some time so that listing works, chosen randomly
time.Sleep(100 * time.Millisecond)

return nodeLister
}

// IsReady checks if the descheduler could run against given node.
func IsReady(node *v1.Node) bool {
for i := range node.Status.Conditions {
Expand Down
12 changes: 11 additions & 1 deletion pkg/descheduler/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/descheduler/test"
)
Expand Down Expand Up @@ -62,7 +63,16 @@ func TestReadyNodesWithNodeSelector(t *testing.T) {

fakeClient := fake.NewSimpleClientset(node1, node2)
nodeSelector := "type=compute"
nodes, _ := ReadyNodes(fakeClient, nodeSelector, nil)

sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

stopChannel := make(chan struct{}, 0)
sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)
defer close(stopChannel)

nodes, _ := ReadyNodes(fakeClient, nodeInformer, nodeSelector, nil)

if nodes[0].Name != "node1" {
t.Errorf("Expected node1, got %s", nodes[0].Name)
Expand Down
31 changes: 23 additions & 8 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
Expand Down Expand Up @@ -93,7 +95,7 @@ func RcByNameContainer(name string, replicas int32, labels map[string]string, gr
}

// startEndToEndForLowNodeUtilization tests the lownode utilization strategy.
func startEndToEndForLowNodeUtilization(clientset clientset.Interface) {
func startEndToEndForLowNodeUtilization(clientset clientset.Interface, nodeInformer coreinformers.NodeInformer) {
var thresholds = make(deschedulerapi.ResourceThresholds)
var targetThresholds = make(deschedulerapi.ResourceThresholds)
thresholds[v1.ResourceMemory] = 20
Expand All @@ -108,7 +110,7 @@ func startEndToEndForLowNodeUtilization(clientset clientset.Interface) {
klog.Fatalf("%v", err)
}
stopChannel := make(chan struct{})
nodes, err := nodeutil.ReadyNodes(clientset, "", stopChannel)
nodes, err := nodeutil.ReadyNodes(clientset, nodeInformer, "", stopChannel)
if err != nil {
klog.Fatalf("%v", err)
}
Expand All @@ -132,14 +134,22 @@ func TestE2E(t *testing.T) {
if err != nil {
t.Errorf("Error listing node with %v", err)
}
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

stopChannel := make(chan struct{}, 0)
sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)
defer close(stopChannel)

// Assumption: We would have 3 node cluster by now. Kubeadm brings all the master components onto master node.
// So, the last node would have least utilization.
rc := RcByNameContainer("test-rc", int32(15), map[string]string{"test": "app"}, nil)
_, err = clientSet.CoreV1().ReplicationControllers("default").Create(rc)
if err != nil {
t.Errorf("Error creating deployment %v", err)
}
evictPods(t, clientSet, nodeList, rc)
evictPods(t, clientSet, nodeInformer, nodeList, rc)

rc.Spec.Template.Annotations = map[string]string{"descheduler.alpha.kubernetes.io/evict": "true"}
rc.Spec.Replicas = func(i int32) *int32 { return &i }(15)
Expand All @@ -156,7 +166,7 @@ func TestE2E(t *testing.T) {
if err != nil {
t.Errorf("Error creating deployment %v", err)
}
evictPods(t, clientSet, nodeList, rc)
evictPods(t, clientSet, nodeInformer, nodeList, rc)
}

func TestDeschedulingInterval(t *testing.T) {
Expand All @@ -173,8 +183,13 @@ func TestDeschedulingInterval(t *testing.T) {

c := make(chan bool)
go func() {
err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy)
if err != nil {
evictionPolicyGroupVersion, err := eutils.SupportEviction(s.Client)
if err != nil || len(evictionPolicyGroupVersion) == 0 {
t.Errorf("Error when checking support for eviction: %v", err)
}

stopChannel := make(chan struct{})
if err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel); err != nil {
t.Errorf("Error running descheduler strategies: %+v", err)
}
c <- true
Expand All @@ -188,7 +203,7 @@ func TestDeschedulingInterval(t *testing.T) {
}
}

func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeList, rc *v1.ReplicationController) {
func evictPods(t *testing.T, clientSet clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeList *v1.NodeList, rc *v1.ReplicationController) {
var leastLoadedNode v1.Node
podsBefore := math.MaxInt16
for i := range nodeList.Items {
Expand All @@ -208,7 +223,7 @@ func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeLis
}
}
t.Log("Eviction of pods starting")
startEndToEndForLowNodeUtilization(clientSet)
startEndToEndForLowNodeUtilization(clientSet, nodeInformer)
podsOnleastUtilizedNode, err := podutil.ListEvictablePodsOnNode(clientSet, &leastLoadedNode, true)
if err != nil {
t.Errorf("Error listing pods on a node %v", err)
Expand Down

0 comments on commit 274bb7a

Please sign in to comment.