diff --git a/pkg/descheduler/node/node.go b/pkg/descheduler/node/node.go index d283280656..2f0e58e89c 100644 --- a/pkg/descheduler/node/node.go +++ b/pkg/descheduler/node/node.go @@ -18,20 +18,24 @@ package node import ( "context" + "errors" "fmt" + "sync/atomic" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - utilerrors "k8s.io/apimachinery/pkg/util/errors" clientset "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" "sigs.k8s.io/descheduler/pkg/utils" ) +const workersCount = 100 + // ReadyNodes returns ready nodes irrespective of whether they are // schedulable or not. func ReadyNodes(ctx context.Context, client clientset.Interface, nodeLister listersv1.NodeLister, nodeSelector string) ([]*v1.Node, error) { @@ -104,90 +108,96 @@ func IsReady(node *v1.Node) bool { // This function is used when the NodeFit pod filtering feature of the Descheduler is enabled. // This function currently considers a subset of the Kubernetes Scheduler's predicates when // deciding if a pod would fit on a node, but more predicates may be added in the future. -func NodeFit(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) []error { +// There should be no methods to modify nodes or pods in this method. +func NodeFit(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) error { // Check node selector and required affinity - var errors []error if ok, err := utils.PodMatchNodeSelector(pod, node); err != nil { - errors = append(errors, err) + return err } else if !ok { - errors = append(errors, fmt.Errorf("pod node selector does not match the node label")) + return errors.New("pod node selector does not match the node label") } + // Check taints (we only care about NoSchedule and NoExecute taints) ok := utils.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, node.Spec.Taints, func(taint *v1.Taint) bool { return taint.Effect == v1.TaintEffectNoSchedule || taint.Effect == v1.TaintEffectNoExecute }) if !ok { - errors = append(errors, fmt.Errorf("pod does not tolerate taints on the node")) + return errors.New("pod does not tolerate taints on the node") } + // Check if the pod can fit on a node based off it's requests if pod.Spec.NodeName == "" || pod.Spec.NodeName != node.Name { - if ok, reqErrors := fitsRequest(nodeIndexer, pod, node); !ok { - errors = append(errors, reqErrors...) + if ok, reqError := fitsRequest(nodeIndexer, pod, node); !ok { + return reqError } } + // Check if node is schedulable if IsNodeUnschedulable(node) { - errors = append(errors, fmt.Errorf("node is not schedulable")) + return errors.New("node is not schedulable") } // Check if pod matches inter-pod anti-affinity rule of pod on node if match, err := podMatchesInterPodAntiAffinity(nodeIndexer, pod, node); err != nil { - errors = append(errors, err) + return err } else if match { - errors = append(errors, fmt.Errorf("pod matches inter-pod anti-affinity rule of other pod on node")) + return errors.New("pod matches inter-pod anti-affinity rule of other pod on node") } - return errors + return nil } -// PodFitsAnyOtherNode checks if the given pod will fit any of the given nodes, besides the node -// the pod is already running on. The predicates used to determine if the pod will fit can be found in the NodeFit function. -func PodFitsAnyOtherNode(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nodes []*v1.Node) bool { - for _, node := range nodes { - // Skip node pod is already on - if node.Name == pod.Spec.NodeName { - continue - } +func podFitsNodes(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nodes []*v1.Node, excludeFilter func(pod *v1.Pod, node *v1.Node) bool) bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - errors := NodeFit(nodeIndexer, pod, node) - if len(errors) == 0 { + var filteredLen int32 + checkNode := func(i int) { + node := nodes[i] + if excludeFilter != nil && excludeFilter(pod, node) { + return + } + err := NodeFit(nodeIndexer, pod, node) + if err == nil { klog.V(4).InfoS("Pod fits on node", "pod", klog.KObj(pod), "node", klog.KObj(node)) - return true + atomic.AddInt32(&filteredLen, 1) + cancel() + } else { + klog.V(4).InfoS("Pod does not fit on node", "pod", klog.KObj(pod), "node", klog.KObj(node), "err", err.Error()) } - klog.V(4).InfoS("Pod does not fit on any other node", - "pod:", klog.KObj(pod), "node:", klog.KObj(node), "error:", utilerrors.NewAggregate(errors).Error()) } - return false + // Stops searching for more nodes once a node are found. + workqueue.ParallelizeUntil(ctx, workersCount, len(nodes), checkNode) + + return filteredLen > 0 +} + +// PodFitsAnyOtherNode checks if the given pod will fit any of the given nodes, besides the node +// the pod is already running on. The predicates used to determine if the pod will fit can be found in the NodeFit function. +func PodFitsAnyOtherNode(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nodes []*v1.Node) bool { + return podFitsNodes(nodeIndexer, pod, nodes, func(pod *v1.Pod, node *v1.Node) bool { + return pod.Spec.NodeName == node.Name + }) } // PodFitsAnyNode checks if the given pod will fit any of the given nodes. The predicates used // to determine if the pod will fit can be found in the NodeFit function. func PodFitsAnyNode(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nodes []*v1.Node) bool { - for _, node := range nodes { - errors := NodeFit(nodeIndexer, pod, node) - if len(errors) == 0 { - klog.V(4).InfoS("Pod fits on node", "pod", klog.KObj(pod), "node", klog.KObj(node)) - return true - } - klog.V(4).InfoS("Pod does not fit on any node", - "pod:", klog.KObj(pod), "node:", klog.KObj(node), "error:", utilerrors.NewAggregate(errors).Error()) - } - - return false + return podFitsNodes(nodeIndexer, pod, nodes, nil) } // PodFitsCurrentNode checks if the given pod will fit onto the given node. The predicates used // to determine if the pod will fit can be found in the NodeFit function. func PodFitsCurrentNode(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) bool { - errors := NodeFit(nodeIndexer, pod, node) - if len(errors) == 0 { + err := NodeFit(nodeIndexer, pod, node) + if err == nil { klog.V(4).InfoS("Pod fits on node", "pod", klog.KObj(pod), "node", klog.KObj(node)) return true } klog.V(4).InfoS("Pod does not fit on current node", - "pod:", klog.KObj(pod), "node:", klog.KObj(node), "error:", utilerrors.NewAggregate(errors).Error()) + "pod", klog.KObj(pod), "node", klog.KObj(node), "error", err) return false } @@ -200,9 +210,7 @@ func IsNodeUnschedulable(node *v1.Node) bool { // fitsRequest determines if a pod can fit on a node based on its resource requests. It returns true if // the pod will fit. -func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) (bool, []error) { - var insufficientResources []error - +func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) (bool, error) { // Get pod requests podRequests, _ := utils.PodRequestsAndLimits(pod) resourceNames := make([]v1.ResourceName, 0, len(podRequests)) @@ -212,25 +220,22 @@ func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nod availableResources, err := nodeAvailableResources(nodeIndexer, node, resourceNames) if err != nil { - return false, []error{err} + return false, err } - podFitsOnNode := true for _, resource := range resourceNames { podResourceRequest := podRequests[resource] availableResource, ok := availableResources[resource] if !ok || podResourceRequest.MilliValue() > availableResource.MilliValue() { - insufficientResources = append(insufficientResources, fmt.Errorf("insufficient %v", resource)) - podFitsOnNode = false + return false, fmt.Errorf("insufficient %v", resource) } } // check pod num, at least one pod number is avaibalbe if availableResources[v1.ResourcePods].MilliValue() <= 0 { - insufficientResources = append(insufficientResources, fmt.Errorf("insufficient %v", v1.ResourcePods)) - podFitsOnNode = false + return false, fmt.Errorf("insufficient %v", v1.ResourcePods) } - return podFitsOnNode, insufficientResources + return true, nil } // nodeAvailableResources returns resources mapped to the quanitity available on the node. diff --git a/pkg/descheduler/node/node_test.go b/pkg/descheduler/node/node_test.go index 383e18bc51..006156383b 100644 --- a/pkg/descheduler/node/node_test.go +++ b/pkg/descheduler/node/node_test.go @@ -19,6 +19,7 @@ package node import ( "context" "errors" + "sync" "testing" v1 "k8s.io/api/core/v1" @@ -230,7 +231,7 @@ func TestPodFitsAnyOtherNode(t *testing.T) { nodeTaintValue := "gpu" // Staging node has no scheduling restrictions, but the pod always starts here and PodFitsAnyOtherNode() doesn't take into account the node the pod is running on. - nodeNames := []string{"node1", "node2", "stagingNode"} + nodeNames := []string{"node1", "node2", "stagingNode", "node4"} tests := []struct { description string @@ -716,6 +717,151 @@ func TestPodFitsAnyOtherNode(t *testing.T) { }, success: false, }, + { + description: "There are four nodes. One node has a taint, and the other three nodes do not meet the resource requirements, should fail", + pod: test.BuildTestPod("p1", 1000, 2*1000*1000*1000, nodeNames[2], func(pod *v1.Pod) { + pod.Spec.NodeSelector = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + pod.Spec.Containers[0].Resources.Requests[v1.ResourceEphemeralStorage] = *resource.NewQuantity(10*1000*1000*1000, resource.DecimalSI) + }), + nodes: []*v1.Node{ + test.BuildTestNode(nodeNames[0], 64000, 128*1000*1000*1000, 200, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(1000*1000*1000*1000, resource.DecimalSI) + node.Spec.Taints = []v1.Taint{ + { + Key: nodeTaintKey, + Value: nodeTaintValue, + Effect: v1.TaintEffectNoSchedule, + }, + } + }), + test.BuildTestNode(nodeNames[1], 3000, 8*1000*1000*1000, 12, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(200*1000*1000*1000, resource.DecimalSI) + }), + test.BuildTestNode(nodeNames[2], 3000, 8*1000*1000*1000, 12, nil), + test.BuildTestNode(nodeNames[3], 0, 0, 0, nil), + }, + podsOnNodes: []*v1.Pod{ + test.BuildTestPod("3-core-pod", 2000, 4*1000*1000*1000, nodeNames[1], func(pod *v1.Pod) { + pod.ObjectMeta = metav1.ObjectMeta{ + Namespace: "test", + Labels: map[string]string{ + "test": "true", + }, + } + pod.Spec.Containers[0].Resources.Requests[v1.ResourceEphemeralStorage] = *resource.NewQuantity(10*1000*1000*1000, resource.DecimalSI) + pod.Spec.Overhead = createResourceList(1000, 1000*1000*1000, 1000*1000*1000) + }), + }, + success: false, + }, + { + description: "There are four nodes. First node has a taint, second node has no label, third node do not meet the resource requirements, just fourth node meets the requirements, should success", + pod: test.BuildTestPod("p1", 1000, 2*1000*1000*1000, nodeNames[2], func(pod *v1.Pod) { + pod.Spec.NodeSelector = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + pod.Spec.Containers[0].Resources.Requests[v1.ResourceEphemeralStorage] = *resource.NewQuantity(10*1000*1000*1000, resource.DecimalSI) + }), + nodes: []*v1.Node{ + test.BuildTestNode(nodeNames[0], 64000, 128*1000*1000*1000, 200, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(1000*1000*1000*1000, resource.DecimalSI) + node.Spec.Taints = []v1.Taint{ + { + Key: nodeTaintKey, + Value: nodeTaintValue, + Effect: v1.TaintEffectNoSchedule, + }, + } + }), + test.BuildTestNode(nodeNames[1], 8000, 8*1000*1000*1000, 12, func(node *v1.Node) { + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(200*1000*1000*1000, resource.DecimalSI) + }), + test.BuildTestNode(nodeNames[2], 1000, 8*1000*1000*1000, 12, nil), + test.BuildTestNode(nodeNames[3], 8000, 8*1000*1000*1000, 12, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(1000*1000*1000*1000, resource.DecimalSI) + }), + }, + podsOnNodes: []*v1.Pod{ + test.BuildTestPod("3-core-pod", 2000, 4*1000*1000*1000, nodeNames[1], func(pod *v1.Pod) { + pod.ObjectMeta = metav1.ObjectMeta{ + Namespace: "test", + Labels: map[string]string{ + "test": "true", + }, + } + pod.Spec.Containers[0].Resources.Requests[v1.ResourceEphemeralStorage] = *resource.NewQuantity(10*1000*1000*1000, resource.DecimalSI) + pod.Spec.Overhead = createResourceList(1000, 1000*1000*1000, 1000*1000*1000) + }), + }, + success: true, + }, + { + description: "There are four nodes. First node has a taint, second node has no label, third node do not meet the resource requirements, fourth node is the one where the pod is located, should fail", + pod: test.BuildTestPod("p1", 1000, 2*1000*1000*1000, nodeNames[3], func(pod *v1.Pod) { + pod.Spec.NodeSelector = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + pod.Spec.Containers[0].Resources.Requests[v1.ResourceEphemeralStorage] = *resource.NewQuantity(10*1000*1000*1000, resource.DecimalSI) + }), + nodes: []*v1.Node{ + test.BuildTestNode(nodeNames[0], 64000, 128*1000*1000*1000, 200, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(1000*1000*1000*1000, resource.DecimalSI) + node.Spec.Taints = []v1.Taint{ + { + Key: nodeTaintKey, + Value: nodeTaintValue, + Effect: v1.TaintEffectNoSchedule, + }, + } + }), + test.BuildTestNode(nodeNames[1], 8000, 8*1000*1000*1000, 12, func(node *v1.Node) { + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(200*1000*1000*1000, resource.DecimalSI) + }), + test.BuildTestNode(nodeNames[2], 1000, 8*1000*1000*1000, 12, nil), + test.BuildTestNode(nodeNames[3], 8000, 8*1000*1000*1000, 12, func(node *v1.Node) { + node.ObjectMeta.Labels = map[string]string{ + nodeLabelKey: nodeLabelValue, + } + + node.Status.Allocatable[v1.ResourceEphemeralStorage] = *resource.NewQuantity(1000*1000*1000*1000, resource.DecimalSI) + }), + }, + podsOnNodes: []*v1.Pod{ + test.BuildTestPod("3-core-pod", 2000, 4*1000*1000*1000, nodeNames[1], func(pod *v1.Pod) { + pod.ObjectMeta = metav1.ObjectMeta{ + Namespace: "test", + Labels: map[string]string{ + "test": "true", + }, + } + pod.Spec.Containers[0].Resources.Requests[v1.ResourceEphemeralStorage] = *resource.NewQuantity(10*1000*1000*1000, resource.DecimalSI) + pod.Spec.Overhead = createResourceList(1000, 1000*1000*1000, 1000*1000*1000) + }), + }, + success: false, + }, } for _, tc := range tests { @@ -753,6 +899,51 @@ func TestPodFitsAnyOtherNode(t *testing.T) { } } +func TestPodFitsNodes(t *testing.T) { + nodeNames := []string{"node1", "node2", "node3", "node4"} + pod := test.BuildTestPod("p1", 950, 2*1000*1000*1000, nodeNames[0], nil) + nodes := []*v1.Node{ + test.BuildTestNode(nodeNames[0], 1000, 8*1000*1000*1000, 12, nil), + test.BuildTestNode(nodeNames[1], 200, 8*1000*1000*1000, 12, nil), + test.BuildTestNode(nodeNames[2], 300, 8*1000*1000*1000, 12, nil), + test.BuildTestNode(nodeNames[3], 400, 8*1000*1000*1000, 12, nil), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var objs []runtime.Object + for _, node := range nodes { + objs = append(objs, node) + } + objs = append(objs, pod) + + fakeClient := fake.NewSimpleClientset(objs...) + + sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + + getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Errorf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + var nodesTraversed sync.Map + podFitsNodes(getPodsAssignedToNode, pod, nodes, func(pod *v1.Pod, node *v1.Node) bool { + nodesTraversed.Store(node.Name, node) + return true + }) + + for _, node := range nodes { + if _, exists := nodesTraversed.Load(node.Name); !exists { + t.Errorf("Node %v was not proccesed", node.Name) + } + } +} + func TestNodeFit(t *testing.T) { node := test.BuildTestNode("node", 64000, 128*1000*1000*1000, 2, func(node *v1.Node) { node.ObjectMeta.Labels = map[string]string{ @@ -852,9 +1043,9 @@ func TestNodeFit(t *testing.T) { sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) - errs := NodeFit(getPodsAssignedToNode, tc.pod, tc.node) - if (len(errs) == 0 && tc.err != nil) || (len(errs) > 0 && errs[0].Error() != tc.err.Error()) { - t.Errorf("Test %#v failed, got %v, expect %v", tc.description, errs, tc.err) + err = NodeFit(getPodsAssignedToNode, tc.pod, tc.node) + if (err == nil && tc.err != nil) || (err != nil && err.Error() != tc.err.Error()) { + t.Errorf("Test %#v failed, got %v, expect %v", tc.description, err, tc.err) } }) }