Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize pods on unschedulable nodes #53

Merged
merged 2 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 72 additions & 18 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,12 +506,12 @@ func (o *Operator) getPodToUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr Sta
return nil, nil
}

prioritizedNodes, err := o.getPrioritizedNodes()
prioritizedNodes, unschedulableNodes, err := o.getNodes()
if err != nil {
return nil, err
}

prioritizedPods, err := prioritizePodsForUpdate(pods, sts, sr, prioritizedNodes)
prioritizedPods, err := prioritizePodsForUpdate(pods, sts, sr, prioritizedNodes, unschedulableNodes)
if err != nil {
return nil, err
}
Expand All @@ -525,21 +525,27 @@ func (o *Operator) getPodToUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr Sta
return &prioritizedPods[0], nil
}

// getPrioritizedNodes gets all nodes matching the priority node selector.
func (o *Operator) getPrioritizedNodes() (map[string]v1.Node, error) {
opts := metav1.ListOptions{
LabelSelector: o.priorityNodeSelectors.String(),
}
// getNodes gets all nodes matching the priority node selector and all nodes
// that are marked unschedulable.
func (o *Operator) getNodes() (map[string]v1.Node, map[string]v1.Node, error) {
opts := metav1.ListOptions{}
nodes, err := o.kube.CoreV1().Nodes().List(opts)
if err != nil {
return nil, err
return nil, nil, err
}

nodesMap := make(map[string]v1.Node, len(nodes.Items))
priorityNodesMap := make(map[string]v1.Node, len(nodes.Items))
unschedulableNodesMap := make(map[string]v1.Node, len(nodes.Items))
for _, node := range nodes.Items {
nodesMap[node.Name] = node
if len(node.Labels) > 0 && labels.AreLabelsInWhiteList(o.priorityNodeSelectors, labels.Set(node.Labels)) {
priorityNodesMap[node.Name] = node
}

if node.Spec.Unschedulable {
unschedulableNodesMap[node.Name] = node
}
}
return nodesMap, nil
return priorityNodesMap, unschedulableNodesMap, nil
}

// annotatePod annotates the Pod with the specified annotation key and value.
Expand Down Expand Up @@ -623,20 +629,54 @@ type updatePriority struct {
}

const (
podDrainingPriority = 8
nodeSelectorPriority = 4
podOldRevisionPriority = 2
stsReplicaDiffPriority = 1
podDrainingPriority = 16
unschedulableNodePriority = 8
nodeSelectorPriority = 4
podOldRevisionPriority = 2
stsReplicaDiffPriority = 1
// priorityNames
podDrainingPriorityName = "PodDraining"
unschedulableNodePriorityName = "UnschedulableNode"
nodeSelectorPriorityName = "NodeSelector"
podOldRevisionPriorityName = "PodOldRevision"
stsReplicaDiffPriorityName = "STSReplicaDiff"
)

func prioToName(priority int) string {
switch priority {
case podDrainingPriority:
return podDrainingPriorityName
case unschedulableNodePriority:
return unschedulableNodePriorityName
case nodeSelectorPriority:
return nodeSelectorPriorityName
case podOldRevisionPriority:
return podOldRevisionPriorityName
case stsReplicaDiffPriority:
return stsReplicaDiffPriorityName
default:
return ""
}
}

func priorityNames(priority int) []string {
priorities := make([]string, 0)
for _, prio := range []int{podDrainingPriority, unschedulableNodePriority, nodeSelectorPriority, podOldRevisionPriority, stsReplicaDiffPriority} {
if priority >= prio {
priorities = append(priorities, prioToName(prio))
}
}
return priorities
}

// prioritizePodsForUpdate prioritizes Pods to update next. The Pods are
// prioritized based on the following rules:
//
// 1. Pods already marked draining get highest priority.
// 2. Pods NOT on a priority node get high priority.
// 3. Pods not up to date with StatefulSet revision get high priority.
// 4. Pods part of a StatefulSet where desired replicas != actual replicas get medium priority.
func prioritizePodsForUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr StatefulResource, nodes map[string]v1.Node) ([]v1.Pod, error) {
func prioritizePodsForUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr StatefulResource, priorityNodes, unschedulableNodes map[string]v1.Node) ([]v1.Pod, error) {
priorities := make([]*updatePriority, 0, len(pods))
for _, pod := range pods {
ordinal := strings.TrimPrefix(pod.Name, pod.GenerateName)
Expand All @@ -661,8 +701,16 @@ func prioritizePodsForUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr Stateful
continue
}

// if Pod is on an unschedulable node it gets high priority.
// An unschedulable node indicates that it is about to be
// drained, so we should priorities moving pods away from the
// node.
if _, ok := unschedulableNodes[pod.Spec.NodeName]; ok {
prio.Priority += unschedulableNodePriority
}

// if Pod is NOT on a priority selected node it gets high priority.
if _, ok := nodes[pod.Spec.NodeName]; !ok {
if _, ok := priorityNodes[pod.Spec.NodeName]; !ok {
prio.Priority += nodeSelectorPriority
}

Expand Down Expand Up @@ -705,7 +753,13 @@ func prioritizePodsForUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr Stateful
// StatefulSet which is currently updating, but it does not
// mean the Pod itself needs to be updated.
if prio.Priority > 1 {
log.Infof("Pod %s/%s should be updated. Priority: %d", prio.Pod.Namespace, prio.Pod.Name, prio.Priority)
log.Infof(
"Pod %s/%s should be updated. Priority: %d (%s)",
prio.Pod.Namespace,
prio.Pod.Name,
prio.Priority,
strings.Join(priorityNames(prio.Priority), ","),
)
sortedPods = append(sortedPods, prio.Pod)
}
}
Expand Down
38 changes: 32 additions & 6 deletions operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ func TestPrioritizePodsForUpdate(t *testing.T) {
},
}

stsPod2 := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "sts-2",
GenerateName: "sts-",
Annotations: map[string]string{},
Labels: map[string]string{
controllerRevisionHashLabelKey: "hash",
},
Namespace: "default",
},
Spec: v1.PodSpec{
NodeName: "node3",
},
}

podNoNode := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "sts-2",
Expand Down Expand Up @@ -143,34 +158,45 @@ func TestPrioritizePodsForUpdate(t *testing.T) {
},
}

nodes := map[string]v1.Node{
priorityNodes := map[string]v1.Node{
"node2": {},
}

unschedulableNodes := map[string]v1.Node{
"node3": {},
}

pods := []v1.Pod{updatingPod}

sortedPods, err := prioritizePodsForUpdate(pods, sts, sr, nodes)
sortedPods, err := prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 1)
assert.Equal(t, updatingPod, sortedPods[0])

// updating pod should be prioritized over stsPod
pods = []v1.Pod{stsPod, updatingPod}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, nodes)
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 2)
assert.Equal(t, updatingPod, sortedPods[0])

// stsPods should be sorted by ordinal number
pods = []v1.Pod{stsPod, stsPod0}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, nodes)
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 2)
assert.Equal(t, stsPod0, sortedPods[0])

// pods on unschedulable nodes should get higher priority
pods = []v1.Pod{stsPod, stsPod2}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 2)
assert.Equal(t, stsPod2, sortedPods[0])

// don't prioritize pods not on a node.
pods = []v1.Pod{podNoNode}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, nodes)
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 0)

Expand All @@ -191,7 +217,7 @@ func TestPrioritizePodsForUpdate(t *testing.T) {
}

pods = []v1.Pod{podUpToDate}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, nodes)
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 0)
}
Expand Down