Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#440 from k82cn/kb_424
Browse files Browse the repository at this point in the history
Updated node info if taints and lables changed.
  • Loading branch information
k8s-ci-robot authored Oct 16, 2018
2 parents 451ef64 + a783654 commit be860e1
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ func (sc *SchedulerCache) addNode(node *v1.Node) error {
}

func isNodeInfoUpdated(oldNode, newNode *v1.Node) bool {
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) ||
!reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) ||
!reflect.DeepEqual(oldNode.Labels, newNode.Labels)
}

// Assumes that lock is already acquired.
Expand Down
39 changes: 39 additions & 0 deletions test/e2e/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,43 @@ var _ = Describe("Predicates E2E Test", func() {
Expect(pod.Spec.NodeName).To(Equal(nodeName))
}
})

It("Taints/Tolerations", func() {
context := initTestContext()
defer cleanupTestContext(context)

taints := []v1.Taint{
{
Key: "test-taint-key",
Value: "test-taint-val",
Effect: v1.TaintEffectNoSchedule,
},
}

err := taintAllNodes(context, taints)
Expect(err).NotTo(HaveOccurred())

job := &jobSpec{
name: "tt-job",
tasks: []taskSpec{
{
img: "nginx",
req: oneCPU,
min: 1,
rep: 1,
},
},
}

_, pg := createJobEx(context, job)
err = waitPodGroupPending(context, pg)
Expect(err).NotTo(HaveOccurred())

err = removeTaintsFromAllNodes(context, taints)
Expect(err).NotTo(HaveOccurred())

err = waitPodGroupReady(context, pg)
Expect(err).NotTo(HaveOccurred())
})

})
89 changes: 89 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package e2e

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand All @@ -33,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -640,3 +643,89 @@ func getPodOfPodGroup(ctx *context, pg *arbv1.PodGroup) []*v1.Pod {

return qjpod
}

func taintAllNodes(ctx *context, taints []v1.Taint) error {
nodes, err := ctx.kubeclient.CoreV1().Nodes().List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

for _, node := range nodes.Items {
newNode := node.DeepCopy()

newTaints := newNode.Spec.Taints
for _, t := range taints {
found := false
for _, nt := range newTaints {
if nt.Key == t.Key {
found = true
break
}
}

if !found {
newTaints = append(newTaints, t)
}
}

newNode.Spec.Taints = newTaints

patchBytes, err := preparePatchBytesforNode(node.Name, &node, newNode)
Expect(err).NotTo(HaveOccurred())

_, err = ctx.kubeclient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
Expect(err).NotTo(HaveOccurred())
}

return nil
}

func removeTaintsFromAllNodes(ctx *context, taints []v1.Taint) error {
nodes, err := ctx.kubeclient.CoreV1().Nodes().List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

for _, node := range nodes.Items {
newNode := node.DeepCopy()

var newTaints []v1.Taint
for _, nt := range newTaints {
found := false
for _, t := range taints {
if nt.Key == t.Key {
found = true
break
}
}

if !found {
newTaints = append(newTaints, nt)
}
}
newNode.Spec.Taints = newTaints

patchBytes, err := preparePatchBytesforNode(node.Name, &node, newNode)
Expect(err).NotTo(HaveOccurred())

_, err = ctx.kubeclient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
Expect(err).NotTo(HaveOccurred())
}

return nil
}

func preparePatchBytesforNode(nodeName string, oldNode *v1.Node, newNode *v1.Node) ([]byte, error) {
oldData, err := json.Marshal(oldNode)
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
}

newData, err := json.Marshal(newNode)
if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
}

return patchBytes, nil
}

0 comments on commit be860e1

Please sign in to comment.