diff --git a/Gopkg.lock b/Gopkg.lock index 8165d21a8f..9560e925f9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -204,7 +204,7 @@ [[projects]] branch = "master" - digest = "1:4281fb8a5ed4671cd7e80c85599e00912f796819716abaefd697797ec9eb4491" + digest = "1:ea789d69109979142a6fbd569095f7678ea62b633b9b5a284f2fd1ff673da48f" name = "github.com/kubernetes-sigs/kube-batch" packages = [ "cmd/kube-batch/app", @@ -247,7 +247,7 @@ "pkg/version", ] pruneopts = "UT" - revision = "700381278a3e7da396507055d371dd93c1724322" + revision = "4b391ab34b53779e47243217006d8772cb86d8d8" source = "https://github.com/volcano-sh/scheduler" [[projects]] diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go index f0e1491d72..1fbe6f415e 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go @@ -103,7 +103,7 @@ func Run(opt *options.ServerOption) error { // Prepare event clients. broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)}) - eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kube-batch"}) + eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: opt.SchedulerName}) hostname, err := os.Hostname() if err != nil { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go index 16257d4140..679818db51 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go @@ -19,6 +19,8 @@ package api import ( "fmt" + "github.com/golang/glog" + v1 "k8s.io/api/core/v1" ) @@ -27,6 +29,9 @@ type NodeInfo struct { Name string Node *v1.Node + // The state of node + State NodeState + // The releasing resource on that node Releasing *Resource // The idle resource on that node @@ -44,10 +49,18 @@ type NodeInfo struct { Other interface{} } +// NodeState defines the current state of node. +type NodeState struct { + Phase NodePhase + Reason string +} + // NewNodeInfo is used to create new nodeInfo object func NewNodeInfo(node *v1.Node) *NodeInfo { + var ni *NodeInfo + if node == nil { - return &NodeInfo{ + ni = &NodeInfo{ Releasing: EmptyResource(), Idle: EmptyResource(), Used: EmptyResource(), @@ -57,21 +70,25 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { Tasks: make(map[TaskID]*TaskInfo), } - } - - return &NodeInfo{ - Name: node.Name, - Node: node, + } else { + ni = &NodeInfo{ + Name: node.Name, + Node: node, - Releasing: EmptyResource(), - Idle: NewResource(node.Status.Allocatable), - Used: EmptyResource(), + Releasing: EmptyResource(), + Idle: NewResource(node.Status.Allocatable), + Used: EmptyResource(), - Allocatable: NewResource(node.Status.Allocatable), - Capability: NewResource(node.Status.Capacity), + Allocatable: NewResource(node.Status.Allocatable), + Capability: NewResource(node.Status.Capacity), - Tasks: make(map[TaskID]*TaskInfo), + Tasks: make(map[TaskID]*TaskInfo), + } } + + ni.setNodeState(node) + + return ni } // Clone used to clone nodeInfo Object @@ -85,8 +102,47 @@ func (ni *NodeInfo) Clone() *NodeInfo { return res } +// Ready returns whether node is ready for scheduling +func (ni *NodeInfo) Ready() bool { + return ni.State.Phase == Ready +} + +func (ni *NodeInfo) setNodeState(node *v1.Node) { + // If node is nil, the node is un-initialized in cache + if node == nil { + ni.State = NodeState{ + Phase: NotReady, + Reason: "UnInitialized", + } + return + } + + // set NodeState according to resources + if !ni.Used.LessEqual(NewResource(node.Status.Allocatable)) { + ni.State = NodeState{ + Phase: NotReady, + Reason: "OutOfSync", + } + return + } + + // Node is ready (ignore node conditions because of taint/toleration) + ni.State = NodeState{ + Phase: Ready, + Reason: "", + } +} + // SetNode sets kubernetes node object to nodeInfo object func (ni *NodeInfo) SetNode(node *v1.Node) { + ni.setNodeState(node) + + if !ni.Ready() { + glog.Warningf("Failed to set node info, phase: %s, reason: %s", + ni.State.Phase, ni.State.Reason) + return + } + ni.Name = node.Name ni.Node = node @@ -176,16 +232,16 @@ func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error { // String returns nodeInfo details in string format func (ni NodeInfo) String() string { - res := "" + tasks := "" i := 0 for _, task := range ni.Tasks { - res = res + fmt.Sprintf("\n\t %d: %v", i, task) + tasks = tasks + fmt.Sprintf("\n\t %d: %v", i, task) i++ } - return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, taints <%v>%s", - ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.Node.Spec.Taints, res) + return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, state , taints <%v>%s", + ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.State.Phase, ni.State.Reason, ni.Node.Spec.Taints, tasks) } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go index 97a583df6c..a88439e451 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go @@ -78,6 +78,27 @@ func (ts TaskStatus) String() string { } } +// NodePhase defines the phase of node +type NodePhase int + +const ( + // Ready means the node is ready for scheduling + Ready NodePhase = 1 << iota + // NotReady means the node is not ready for scheduling + NotReady +) + +func (np NodePhase) String() string { + switch np { + case Ready: + return "Ready" + case NotReady: + return "NotReady" + } + + return "Unknown" +} + // validateStatusUpdate validates whether the status transfer is valid. func validateStatusUpdate(oldStatus, newStatus TaskStatus) error { return nil diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go index f6f1c04d2e..322d3b4d02 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go @@ -200,7 +200,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s // Prepare event clients. broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: sc.kubeclient.CoreV1().Events("")}) - sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kube-batch"}) + sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName}) sc.Binder = &defaultBinder{ kubeclient: sc.kubeclient, @@ -465,7 +465,10 @@ func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string) pod := task.Pod.DeepCopy() - sc.Recorder.Eventf(pod, v1.EventTypeWarning, string(v1.PodReasonUnschedulable), message) + // The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in + // k8s core, so using the same string here. + // The reason field in PodCondition should be "Unschedulable" + sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message) if _, err := sc.StatusUpdater.UpdatePodCondition(pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, @@ -546,6 +549,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { } for _, value := range sc.Nodes { + if !value.Ready() { + continue + } + snapshot.Nodes[value.Name] = value.Clone() } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go index 4260521fdc..3b005a2fee 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go @@ -18,7 +18,6 @@ package cache import ( "fmt" - "reflect" "github.com/golang/glog" @@ -269,19 +268,10 @@ func (sc *SchedulerCache) addNode(node *v1.Node) error { return nil } -func isNodeInfoUpdated(oldNode, newNode *v1.Node) bool { - return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) || - !reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) || - !reflect.DeepEqual(oldNode.Labels, newNode.Labels) || - !reflect.DeepEqual(oldNode.Spec.Unschedulable, newNode.Spec.Unschedulable) -} - // Assumes that lock is already acquired. func (sc *SchedulerCache) updateNode(oldNode, newNode *v1.Node) error { if sc.Nodes[newNode.Name] != nil { - if isNodeInfoUpdated(oldNode, newNode) { - sc.Nodes[newNode.Name].SetNode(newNode) - } + sc.Nodes[newNode.Name].SetNode(newNode) return nil }