Skip to content

Commit

Permalink
Add the allocate logic for gpu sharing.
Browse files Browse the repository at this point in the history
  • Loading branch information
tizhou86 committed Jun 11, 2020
1 parent d6bc0b1 commit 7f9319d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 10 deletions.
37 changes: 37 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,40 @@ func (ni *NodeInfo) GetDevicesAllGPUMemory() map[int]uint {
}
return res
}

func (ni *NodeInfo) GetDeviceCoreId(pod *v1.Pod) (int, bool) {

resId := -1
resOk := false
cMem := uint(0)
rMems := ni.GetDevicesRemainGPUMemory()

gpuReq := uint(0)

if len(pod.ObjectMeta.Annotations) > 0 {
req, ok := pod.ObjectMeta.Annotations["volcano.sh/pod-gpu-memory"]
if ok {
s, _ := strconv.Atoi(req)
gpuReq = uint(s)
}
}

if gpuReq > uint(0) {
if len(rMems) > 0 {
for i := 0; i < len(ni.Devices); i++ {
rMem, ok := rMems[i]
if ok {
if rMem >= gpuReq {
if resId == -1 || cMem > rMem {
resId = i
cMem = rMem
}
resOk = true
}
}
}
}
}

return resId, resOk
}
13 changes: 13 additions & 0 deletions pkg/scheduler/api/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package api

import (
"fmt"

v1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -71,3 +73,14 @@ func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource {

return result
}

func UpdateGPUPod(oldPod *v1.Pod, coreId int, memoryPerCore int) (newPod *v1.Pod) {
newPod = oldPod.DeepCopy()
if len(newPod.ObjectMeta.Annotations) == 0 {
newPod.ObjectMeta.Annotations = map[string]string{}
}

newPod.ObjectMeta.Annotations["volcano.sh/gpu-core-id"] = fmt.Sprintf("%d", coreId)

return newPod
}
59 changes: 49 additions & 10 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package predicates
import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
Expand Down Expand Up @@ -118,30 +120,69 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {

nodeMap, _ = util.GenerateNodeMapAndSlice(ssn.Nodes)

predicate := enablePredicate(pp.pluginArguments)

//TODO: (tizhou86) zhonghu will pass the client set into this function
clientSet := NewClientSetOrClientSetInstance()

// Register event handlers to update task info in PodLister & nodeMap
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
pod := pl.UpdateTask(event.Task, event.Task.NodeName)

nodeName := event.Task.NodeName
node, found := nodeMap[nodeName]
if !found {
klog.Warningf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
nodeInfo, foundInfo := ssn.Nodes[nodeName]

if predicate.gpuSharingEnable {
if !foundInfo {
klog.Warningf("predicates with gpu sharing, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
} else {
coreId, found := nodeInfo.GetDeviceCoreId(pod)
if found {
var err error
pod, err = clientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
newPod := api.UpdateGPUPod(pod, coreId, nodeInfo.GPUTotalMemory/nodeInfo.GPUTotalCore)
_, err = clientSet.CoreV1().Pods(newPod.Namespace).Update(newPod)
} else {
klog.Errorf("The node %s can't place the pod %s in ns %s", pod.Spec.NodeName, pod.Name, pod.Namespace)
}

dev, found := nodeInfo.Devices[coreId]
if !found {

} else {
dev.PodMap[newPod.UID] = newPod
node.AddPod(pod)
}

klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
}
} else {
node.AddPod(pod)
klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
if !found {
klog.Warningf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
} else {
node.AddPod(pod)
klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
}
}

},
DeallocateFunc: func(event *framework.Event) {
pod := pl.UpdateTask(event.Task, "")

nodeName := event.Task.NodeName
node, found := nodeMap[nodeName]
if !found {
klog.Warningf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)

if predicate.gpuSharingEnable {
//TODO: (tizhou86) add deallocate logic
} else {
node.RemovePod(pod)
klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
if !found {
klog.Warningf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
} else {
node.RemovePod(pod)
klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
}
}
},
})
Expand All @@ -150,8 +191,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
Session: ssn,
}

predicate := enablePredicate(pp.pluginArguments)

ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
if !found {
Expand Down

0 comments on commit 7f9319d

Please sign in to comment.