Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List nodes through informer in every iteration #249

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/klog"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
Expand All @@ -46,28 +47,38 @@ func Run(rs *options.DeschedulerServer) error {
return fmt.Errorf("deschedulerPolicy is nil")
}

return RunDeschedulerStrategies(rs, deschedulerPolicy)
}

func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy) error {
evictionPolicyGroupVersion, err := eutils.SupportEviction(rs.Client)
if err != nil || len(evictionPolicyGroupVersion) == 0 {
return err
}

stopChannel := make(chan struct{})
nodes, err := nodeutil.ReadyNodes(rs.Client, rs.NodeSelector, stopChannel)
if err != nil {
return err
}
return RunDeschedulerStrategies(rs, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel)
}

if len(nodes) <= 1 {
klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
return nil
}
func RunDeschedulerStrategies(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, stopChannel chan struct{}) error {
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)

nodePodCount := utils.InitializeNodePodCount(nodes)
wait.Until(func() {
nodes, err := nodeutil.ReadyNodes(rs.Client, nodeInformer, rs.NodeSelector, stopChannel)
if err != nil {
klog.V(1).Infof("Unable to get ready nodes: %v", err)
close(stopChannel)
return
}

if len(nodes) <= 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is number this a good candidate for a configurable option in a future PR? e.g. a user might want to set this to match the number of AZs in their cluster.

Copy link
Contributor Author

@ingvagabund ingvagabund Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two nodes is a minimal condition for running the descheduler. In case you need to also verify each AZ has at list one node available, what are the chances you will not have it before you run the descheduler. Also, it's maybe more convenient for individual strategies. No strategy should evict a pod if there's no other node where the new pod can land. Including constraining strategies to be zone aware.

klog.V(1).Infof("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
close(stopChannel)
return
}

nodePodCount := utils.InitializeNodePodCount(nodes)

strategies.RemoveDuplicatePods(rs, deschedulerPolicy.Strategies["RemoveDuplicates"], evictionPolicyGroupVersion, nodes, nodePodCount)
strategies.LowNodeUtilization(rs, deschedulerPolicy.Strategies["LowNodeUtilization"], evictionPolicyGroupVersion, nodes, nodePodCount)
strategies.RemovePodsViolatingInterPodAntiAffinity(rs, deschedulerPolicy.Strategies["RemovePodsViolatingInterPodAntiAffinity"], evictionPolicyGroupVersion, nodes, nodePodCount)
Expand Down
92 changes: 92 additions & 0 deletions pkg/descheduler/descheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package descheduler

import (
"fmt"
"strings"
"testing"
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/test"
)

func TestTaintsUpdated(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10)
n2 := test.BuildTestNode("n2", 2000, 3000, 10)

p1 := test.BuildTestPod(fmt.Sprintf("pod_1_%s", n1.Name), 200, 0, n1.Name)
p1.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
{},
}

client := fakeclientset.NewSimpleClientset(n1, n2, p1)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{
"RemovePodsViolatingNodeTaints": api.DeschedulerStrategy{
Enabled: true,
},
},
}

stopChannel := make(chan struct{})
defer close(stopChannel)

rs := options.NewDeschedulerServer()
rs.Client = client
rs.DeschedulingInterval = 100 * time.Millisecond
go func() {
err := RunDeschedulerStrategies(rs, dp, "v1beta1", stopChannel)
if err != nil {
t.Fatalf("Unable to run descheduler strategies: %v", err)
}
}()

// Wait for few cycles and then verify the only pod still exists
time.Sleep(300 * time.Millisecond)
pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unable to list pods: %v", err)
}
if len(pods.Items) < 1 {
t.Errorf("The pod was evicted before a node was tained")
}

n1WithTaint := n1.DeepCopy()
n1WithTaint.Spec.Taints = []v1.Taint{
{
Key: "key",
Value: "value",
Effect: v1.TaintEffectNoSchedule,
},
}

if _, err := client.CoreV1().Nodes().Update(n1WithTaint); err != nil {
t.Fatalf("Unable to update node: %v\n", err)
}

if err := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
// Get over evicted pod result in panic
//pods, err := client.CoreV1().Pods(p1.Namespace).Get(p1.Name, metav1.GetOptions{})
// List is better, it does not panic.
// Though once the pod is evicted, List starts to error with "can't assign or convert v1beta1.Eviction into v1.Pod"
pods, err := client.CoreV1().Pods(p1.Namespace).List(metav1.ListOptions{})
if err == nil {
if len(pods.Items) > 0 {
return false, nil
}
return true, nil
}
if strings.Contains(err.Error(), "can't assign or convert v1beta1.Eviction into v1.Pod") {
return true, nil
}

return false, nil
}); err != nil {
t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies")
}
}
33 changes: 5 additions & 28 deletions pkg/descheduler/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,27 @@ limitations under the License.
package node

import (
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"sigs.k8s.io/descheduler/pkg/utils"
)

// ReadyNodes returns ready nodes irrespective of whether they are
// schedulable or not.
func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
func ReadyNodes(client clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeSelector string, stopChannel <-chan struct{}) ([]*v1.Node, error) {
ns, err := labels.Parse(nodeSelector)
if err != nil {
return []*v1.Node{}, err
}

var nodes []*v1.Node
nl := GetNodeLister(client, stopChannel)
if nl != nil {
// err is defined above
if nodes, err = nl.List(ns); err != nil {
return []*v1.Node{}, err
}
// err is defined above
if nodes, err = nodeInformer.Lister().List(ns); err != nil {
return []*v1.Node{}, err
}

if len(nodes) == 0 {
Expand Down Expand Up @@ -74,22 +67,6 @@ func ReadyNodes(client clientset.Interface, nodeSelector string, stopChannel <-c
return readyNodes, nil
}

func GetNodeLister(client clientset.Interface, stopChannel <-chan struct{}) corelisters.NodeLister {
if stopChannel == nil {
return nil
}
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
nodeLister := corelisters.NewNodeLister(store)
reflector := cache.NewReflector(listWatcher, &v1.Node{}, store, time.Hour)
go reflector.Run(stopChannel)

// To give some time so that listing works, chosen randomly
time.Sleep(100 * time.Millisecond)

return nodeLister
}

// IsReady checks if the descheduler could run against given node.
func IsReady(node *v1.Node) bool {
for i := range node.Status.Conditions {
Expand Down
12 changes: 11 additions & 1 deletion pkg/descheduler/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/descheduler/test"
)
Expand Down Expand Up @@ -62,7 +63,16 @@ func TestReadyNodesWithNodeSelector(t *testing.T) {

fakeClient := fake.NewSimpleClientset(node1, node2)
nodeSelector := "type=compute"
nodes, _ := ReadyNodes(fakeClient, nodeSelector, nil)

sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

stopChannel := make(chan struct{}, 0)
sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)
defer close(stopChannel)

nodes, _ := ReadyNodes(fakeClient, nodeInformer, nodeSelector, nil)

if nodes[0].Name != "node1" {
t.Errorf("Expected node1, got %s", nodes[0].Name)
Expand Down
31 changes: 23 additions & 8 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
Expand Down Expand Up @@ -93,7 +95,7 @@ func RcByNameContainer(name string, replicas int32, labels map[string]string, gr
}

// startEndToEndForLowNodeUtilization tests the lownode utilization strategy.
func startEndToEndForLowNodeUtilization(clientset clientset.Interface) {
func startEndToEndForLowNodeUtilization(clientset clientset.Interface, nodeInformer coreinformers.NodeInformer) {
var thresholds = make(deschedulerapi.ResourceThresholds)
var targetThresholds = make(deschedulerapi.ResourceThresholds)
thresholds[v1.ResourceMemory] = 20
Expand All @@ -108,7 +110,7 @@ func startEndToEndForLowNodeUtilization(clientset clientset.Interface) {
klog.Fatalf("%v", err)
}
stopChannel := make(chan struct{})
nodes, err := nodeutil.ReadyNodes(clientset, "", stopChannel)
nodes, err := nodeutil.ReadyNodes(clientset, nodeInformer, "", stopChannel)
if err != nil {
klog.Fatalf("%v", err)
}
Expand All @@ -132,14 +134,22 @@ func TestE2E(t *testing.T) {
if err != nil {
t.Errorf("Error listing node with %v", err)
}
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()

stopChannel := make(chan struct{}, 0)
sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)
defer close(stopChannel)

// Assumption: We would have 3 node cluster by now. Kubeadm brings all the master components onto master node.
// So, the last node would have least utilization.
rc := RcByNameContainer("test-rc", int32(15), map[string]string{"test": "app"}, nil)
_, err = clientSet.CoreV1().ReplicationControllers("default").Create(rc)
if err != nil {
t.Errorf("Error creating deployment %v", err)
}
evictPods(t, clientSet, nodeList, rc)
evictPods(t, clientSet, nodeInformer, nodeList, rc)

rc.Spec.Template.Annotations = map[string]string{"descheduler.alpha.kubernetes.io/evict": "true"}
rc.Spec.Replicas = func(i int32) *int32 { return &i }(15)
Expand All @@ -156,7 +166,7 @@ func TestE2E(t *testing.T) {
if err != nil {
t.Errorf("Error creating deployment %v", err)
}
evictPods(t, clientSet, nodeList, rc)
evictPods(t, clientSet, nodeInformer, nodeList, rc)
}

func TestDeschedulingInterval(t *testing.T) {
Expand All @@ -173,8 +183,13 @@ func TestDeschedulingInterval(t *testing.T) {

c := make(chan bool)
go func() {
err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy)
if err != nil {
evictionPolicyGroupVersion, err := eutils.SupportEviction(s.Client)
if err != nil || len(evictionPolicyGroupVersion) == 0 {
t.Errorf("Error when checking support for eviction: %v", err)
}

stopChannel := make(chan struct{})
if err := descheduler.RunDeschedulerStrategies(s, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel); err != nil {
t.Errorf("Error running descheduler strategies: %+v", err)
}
c <- true
Expand All @@ -188,7 +203,7 @@ func TestDeschedulingInterval(t *testing.T) {
}
}

func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeList, rc *v1.ReplicationController) {
func evictPods(t *testing.T, clientSet clientset.Interface, nodeInformer coreinformers.NodeInformer, nodeList *v1.NodeList, rc *v1.ReplicationController) {
var leastLoadedNode v1.Node
podsBefore := math.MaxInt16
for i := range nodeList.Items {
Expand All @@ -208,7 +223,7 @@ func evictPods(t *testing.T, clientSet clientset.Interface, nodeList *v1.NodeLis
}
}
t.Log("Eviction of pods starting")
startEndToEndForLowNodeUtilization(clientSet)
startEndToEndForLowNodeUtilization(clientSet, nodeInformer)
podsOnleastUtilizedNode, err := podutil.ListEvictablePodsOnNode(clientSet, &leastLoadedNode, true)
if err != nil {
t.Errorf("Error listing pods on a node %v", err)
Expand Down