Skip to content

Commit

Permalink
return node fit error in advance
Browse files Browse the repository at this point in the history
  • Loading branch information
fanhaouu committed Jun 12, 2024
1 parent 69e5c5a commit a2e871f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 44 deletions.
100 changes: 59 additions & 41 deletions pkg/descheduler/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ package node

import (
"context"
"errors"
"fmt"
"sync/atomic"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/utils"
)

const workersCount = 100

// ReadyNodes returns ready nodes irrespective of whether they are
// schedulable or not.
func ReadyNodes(ctx context.Context, client clientset.Interface, nodeLister listersv1.NodeLister, nodeSelector string) ([]*v1.Node, error) {
Expand Down Expand Up @@ -104,90 +108,109 @@ func IsReady(node *v1.Node) bool {
// This function is used when the NodeFit pod filtering feature of the Descheduler is enabled.
// This function currently considers a subset of the Kubernetes Scheduler's predicates when
// deciding if a pod would fit on a node, but more predicates may be added in the future.
func NodeFit(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) []error {
func NodeFit(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) error {
// Check node selector and required affinity
var errors []error
if ok, err := utils.PodMatchNodeSelector(pod, node); err != nil {
errors = append(errors, err)
return err
} else if !ok {
errors = append(errors, fmt.Errorf("pod node selector does not match the node label"))
return errors.New("pod node selector does not match the node label")
}

// Check taints (we only care about NoSchedule and NoExecute taints)
ok := utils.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, node.Spec.Taints, func(taint *v1.Taint) bool {
return taint.Effect == v1.TaintEffectNoSchedule || taint.Effect == v1.TaintEffectNoExecute
})
if !ok {
errors = append(errors, fmt.Errorf("pod does not tolerate taints on the node"))
return errors.New("pod does not tolerate taints on the node")
}

// Check if the pod can fit on a node based off it's requests
if pod.Spec.NodeName == "" || pod.Spec.NodeName != node.Name {
if ok, reqErrors := fitsRequest(nodeIndexer, pod, node); !ok {
errors = append(errors, reqErrors...)
if ok, reqError := fitsRequest(nodeIndexer, pod, node); !ok {
return reqError
}
}

// Check if node is schedulable
if IsNodeUnschedulable(node) {
errors = append(errors, fmt.Errorf("node is not schedulable"))
return errors.New("node is not schedulable")
}

// Check if pod matches inter-pod anti-affinity rule of pod on node
if match, err := podMatchesInterPodAntiAffinity(nodeIndexer, pod, node); err != nil {
errors = append(errors, err)
return err
} else if match {
errors = append(errors, fmt.Errorf("pod matches inter-pod anti-affinity rule of other pod on node"))
return errors.New("pod matches inter-pod anti-affinity rule of other pod on node")
}

return errors
return nil
}

// PodFitsAnyOtherNode checks if the given pod will fit any of the given nodes, besides the node
// the pod is already running on. The predicates used to determine if the pod will fit can be found in the NodeFit function.
func PodFitsAnyOtherNode(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nodes []*v1.Node) bool {
for _, node := range nodes {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var filteredLen int32
checkNode := func(i int) {
node := nodes[i]
// Skip node pod is already on
if node.Name == pod.Spec.NodeName {
continue
return
}

errors := NodeFit(nodeIndexer, pod, node)
if len(errors) == 0 {
err := NodeFit(nodeIndexer, pod, node)
if err == nil {
klog.V(4).InfoS("Pod fits on node", "pod", klog.KObj(pod), "node", klog.KObj(node))
return true
atomic.AddInt32(&filteredLen, 1)
cancel()
} else {
klog.V(4).InfoS("Pod does not fit on node", "pod", klog.KObj(pod), "node", klog.KObj(node), "err", err.Error())
}
klog.V(4).InfoS("Pod does not fit on any other node",
"pod:", klog.KObj(pod), "node:", klog.KObj(node), "error:", utilerrors.NewAggregate(errors).Error())
}

return false
// Stops searching for more nodes once a node are found.
workqueue.ParallelizeUntil(ctx, workersCount, len(nodes), checkNode)

return filteredLen > 0
}

// PodFitsAnyNode checks if the given pod will fit any of the given nodes. The predicates used
// to determine if the pod will fit can be found in the NodeFit function.
func PodFitsAnyNode(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nodes []*v1.Node) bool {
for _, node := range nodes {
errors := NodeFit(nodeIndexer, pod, node)
if len(errors) == 0 {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var filteredLen int32
checkNode := func(i int) {
node := nodes[i]
err := NodeFit(nodeIndexer, pod, node)
if err == nil {
klog.V(4).InfoS("Pod fits on node", "pod", klog.KObj(pod), "node", klog.KObj(node))
return true
atomic.AddInt32(&filteredLen, 1)
cancel()
} else {
klog.V(4).InfoS("Pod does not fit on node", "pod", klog.KObj(pod), "node", klog.KObj(node), "err", err.Error())
}
klog.V(4).InfoS("Pod does not fit on any node",
"pod:", klog.KObj(pod), "node:", klog.KObj(node), "error:", utilerrors.NewAggregate(errors).Error())
}

return false
// Stops searching for more nodes once a node are found.
workqueue.ParallelizeUntil(ctx, workersCount, len(nodes), checkNode)

return filteredLen > 0
}

// PodFitsCurrentNode checks if the given pod will fit onto the given node. The predicates used
// to determine if the pod will fit can be found in the NodeFit function.
func PodFitsCurrentNode(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) bool {
errors := NodeFit(nodeIndexer, pod, node)
if len(errors) == 0 {
err := NodeFit(nodeIndexer, pod, node)
if err == nil {
klog.V(4).InfoS("Pod fits on node", "pod", klog.KObj(pod), "node", klog.KObj(node))
return true
}

klog.V(4).InfoS("Pod does not fit on current node",
"pod:", klog.KObj(pod), "node:", klog.KObj(node), "error:", utilerrors.NewAggregate(errors).Error())
"pod", klog.KObj(pod), "node", klog.KObj(node), "error", err)

return false
}
Expand All @@ -200,9 +223,7 @@ func IsNodeUnschedulable(node *v1.Node) bool {

// fitsRequest determines if a pod can fit on a node based on its resource requests. It returns true if
// the pod will fit.
func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) (bool, []error) {
var insufficientResources []error

func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, node *v1.Node) (bool, error) {
// Get pod requests
podRequests, _ := utils.PodRequestsAndLimits(pod)
resourceNames := make([]v1.ResourceName, 0, len(podRequests))
Expand All @@ -212,25 +233,22 @@ func fitsRequest(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *v1.Pod, nod

availableResources, err := nodeAvailableResources(nodeIndexer, node, resourceNames)
if err != nil {
return false, []error{err}
return false, err
}

podFitsOnNode := true
for _, resource := range resourceNames {
podResourceRequest := podRequests[resource]
availableResource, ok := availableResources[resource]
if !ok || podResourceRequest.MilliValue() > availableResource.MilliValue() {
insufficientResources = append(insufficientResources, fmt.Errorf("insufficient %v", resource))
podFitsOnNode = false
return false, fmt.Errorf("insufficient %v", resource)
}
}
// check pod num, at least one pod number is avaibalbe
if availableResources[v1.ResourcePods].MilliValue() <= 0 {
insufficientResources = append(insufficientResources, fmt.Errorf("insufficient %v", v1.ResourcePods))
podFitsOnNode = false
return false, fmt.Errorf("insufficient %v", v1.ResourcePods)
}

return podFitsOnNode, insufficientResources
return true, nil
}

// nodeAvailableResources returns resources mapped to the quanitity available on the node.
Expand Down
6 changes: 3 additions & 3 deletions pkg/descheduler/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,9 @@ func TestNodeFit(t *testing.T) {

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
errs := NodeFit(getPodsAssignedToNode, tc.pod, tc.node)
if (len(errs) == 0 && tc.err != nil) || (len(errs) > 0 && errs[0].Error() != tc.err.Error()) {
t.Errorf("Test %#v failed, got %v, expect %v", tc.description, errs, tc.err)
err = NodeFit(getPodsAssignedToNode, tc.pod, tc.node)
if (err == nil && tc.err != nil) || (err != nil && err.Error() != tc.err.Error()) {
t.Errorf("Test %#v failed, got %v, expect %v", tc.description, err, tc.err)
}
})
}
Expand Down

0 comments on commit a2e871f

Please sign in to comment.