Skip to content

Commit

Permalink
Add gpu sharing in predicate logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
tizhou86 committed Jun 8, 2020
1 parent 26f5b61 commit 944a364
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 2 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module volcano.sh/volcano

go 1.13
go 1.14

replace (
k8s.io/api => k8s.io/api v0.0.0-20200131112707-d64dbec685a4
Expand Down
63 changes: 63 additions & 0 deletions pkg/scheduler/api/device_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"strconv"

v1 "k8s.io/api/core/v1"
)

type DeviceInfo struct {
Id int
PodMap map[string]*v1.Pod
GPUTotalMemory uint
}

func (di *DeviceInfo) GetPods() []*v1.Pod {
pods := []*v1.Pod{}
for _, pod := range di.PodMap {
pods = append(pods, pod)
}
return pods
}

func NewDeviceInfo(id int, mem uint) *DeviceInfo {
return &DeviceInfo{
Id: id,
GPUTotalMemory: mem,
PodMap: map[string]*v1.Pod{},
}
}

func (di *DeviceInfo) GetUsedGPUMemory() uint {
res := uint(0)
for _, pod := range di.PodMap {
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
continue
} else {
if len(pod.ObjectMeta.Annotations) > 0 {
mem, found := pod.ObjectMeta.Annotations["volcano.sh/pod-gpu-memory"]
if found {
m, _ := strconv.Atoi(mem)
res += uint(m)
}
}
}
}
return res
}
89 changes: 89 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package api

import (
"fmt"
"strconv"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -46,6 +47,10 @@ type NodeInfo struct {

Tasks map[TaskID]*TaskInfo

Devices map[int]*DeviceInfo
GPUTotalCore int
GPUTotalMemory int

// Used to store custom information
Others map[string]interface{}
}
Expand Down Expand Up @@ -78,6 +83,10 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
Capability: EmptyResource(),

Tasks: make(map[TaskID]*TaskInfo),

Devices: make(map[int]*DeviceInfo),
GPUTotalCore: 0,
GPUTotalMemory: 0,
}
} else {
ni = &NodeInfo{
Expand All @@ -93,9 +102,14 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
Capability: NewResource(node.Status.Capacity),

Tasks: make(map[TaskID]*TaskInfo),

Devices: make(map[int]*DeviceInfo),
GPUTotalCore: 0,
GPUTotalMemory: 0,
}
}

ni.SetNodeGPUInfo(node)
ni.setNodeState(node)

return ni
Expand Down Expand Up @@ -154,6 +168,24 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) {
}
}

func (ni *NodeInfo) SetNodeGPUInfo(node *v1.Node) {

core, ok := node.Status.Capacity["volcano.sh/node-gpu-core"]
if ok {
ni.GPUTotalCore = int(core.Value())
}

mem, ok := node.Status.Capacity["volcano.sh/node-gpu-memory"]
if ok {
ni.GPUTotalMemory = int(mem.Value())
}

for i := 0; i < int(core.Value()); i++ {
ni.Devices[i] = NewDeviceInfo(i, uint(int(mem.Value())/int(core.Value())))
}

}

// SetNode sets kubernetes node object to nodeInfo object
func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.setNodeState(node)
Expand Down Expand Up @@ -314,3 +346,60 @@ func (ni *NodeInfo) Pods() (pods []*v1.Pod) {

return
}

func (ni *NodeInfo) CheckPredicatePodOnGPUNode(pod *v1.Pod) bool {
res := false
memReq := uint(0)

remainMems := ni.GetDevicesRemainGPUMemory()
if len(pod.ObjectMeta.Annotations) > 0 {
mem, found := pod.ObjectMeta.Annotations["volcano.sh/pod-gpu-memory"]
if found {
m, _ := strconv.Atoi(mem)
memReq = uint(m)
}
}

if len(remainMems) > 0 {
for devID := 0; devID < len(ni.Devices); devID++ {
availableGPU, ok := remainMems[devID]
if ok {
if availableGPU >= memReq {
res = true
break
}
}
}
}

return res

}

func (ni *NodeInfo) GetDevicesRemainGPUMemory() map[int]uint {
devicesAllGPUMemory := ni.GetDevicesAllGPUMemory()
devicesUsedGPUMemory := ni.GetDevicesUsedGPUMemory()
res := map[int]uint{}
for id, allMemory := range devicesAllGPUMemory {
if usedMemory, found := devicesUsedGPUMemory[id]; found {
res[id] = allMemory - usedMemory
}
}
return res
}

func (ni *NodeInfo) GetDevicesUsedGPUMemory() map[int]uint {
res := map[int]uint{}
for _, device := range ni.Devices {
res[device.Id] = device.GetUsedGPUMemory()
}
return res
}

func (ni *NodeInfo) GetDevicesAllGPUMemory() map[int]uint {
res := map[int]uint{}
for _, device := range ni.Devices {
res[device.Id] = device.GPUTotalMemory
}
return res
}
43 changes: 42 additions & 1 deletion pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.
package predicates

import (
"fmt"

"k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
Expand All @@ -37,6 +40,8 @@ const (
DiskPressurePredicate = "predicate.DiskPressureEnable"
// PIDPressurePredicate is the key for enabling PID Pressure Predicate in YAML
PIDPressurePredicate = "predicate.PIDPressureEnable"
// GPUSharingPredicate is the key for enabling GPU Sharing Predicate in YAML
GPUSharingPredicate = "predicate.GPUSharingEnable"
)

type predicatesPlugin struct {
Expand All @@ -57,12 +62,13 @@ type predicateEnable struct {
memoryPressureEnable bool
diskPressureEnable bool
pidPressureEnable bool
gpuSharingEnable bool
}

func enablePredicate(args framework.Arguments) predicateEnable {

/*
User Should give predicatesEnable in this format(predicate.MemoryPressureEnable, predicate.DiskPressureEnable, predicate.PIDPressureEnable.
User Should give predicatesEnable in this format(predicate.MemoryPressureEnable, predicate.DiskPressureEnable, predicate.PIDPressureEnable, predicate.GPUSharingEnable.
Currently supported only for MemoryPressure, DiskPressure, PIDPressure predicate checks.
actions: "reclaim, allocate, backfill, preempt"
Expand All @@ -78,6 +84,7 @@ func enablePredicate(args framework.Arguments) predicateEnable {
predicate.MemoryPressureEnable: true
predicate.DiskPressureEnable: true
predicate.PIDPressureEnable: true
predicate.GPUSharingEnable: true
- name: proportion
- name: nodeorder
*/
Expand All @@ -86,6 +93,7 @@ func enablePredicate(args framework.Arguments) predicateEnable {
memoryPressureEnable: false,
diskPressureEnable: false,
pidPressureEnable: false,
gpuSharingEnable: false,
}

// Checks whether predicate.MemoryPressureEnable is provided or not, if given, modifies the value in predicateEnable struct.
Expand All @@ -97,6 +105,9 @@ func enablePredicate(args framework.Arguments) predicateEnable {
// Checks whether predicate.PIDPressureEnable is provided or not, if given, modifies the value in predicateEnable struct.
args.GetBool(&predicate.pidPressureEnable, PIDPressurePredicate)

// Checks whether predicate.GPUSharingEnable is provided or not, if given, modifies the value in predicateEnable struct.
args.GetBool(&predicate.gpuSharingEnable, GPUSharingPredicate)

return predicate
}

Expand Down Expand Up @@ -269,6 +280,21 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
}
}

if predicate.gpuSharingEnable {
// CheckGPUSharingPredicate
fit, reasons, err = CheckNodeGPUSharingPredicate(task.Pod, node)
if err != nil {
return err
}

klog.V(4).Infof("CheckNodeGPUSharingPredicate predicates Task <%s/%s> on Node <%s>: fit %t, err %v",
task.Namespace, task.Name, node.Name, fit, err)

if !fit {
return api.NewFitErrorByReasons(task, node, reasons...)
}
}

var lister algorithm.PodLister
lister = pl
if !util.HaveAffinity(task.Pod) {
Expand All @@ -294,3 +320,18 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
}

func (pp *predicatesPlugin) OnSessionClose(ssn *framework.Session) {}

// CheckNodeGPUSharingPredicate checks if a gpu sharing pod can be scheduled on a node.
func CheckNodeGPUSharingPredicate(pod *v1.Pod, nodeInfo *api.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {

_, ok := nodeInfo.Node.Status.Capacity["volcano.sh/node-gpu-core"]
if !ok {
return false, nil, fmt.Errorf("node is not gpu sharing")
} else {
isEnoughGPUMemoryOnNode := nodeInfo.CheckPredicatePodOnGPUNode(pod)
if !isEnoughGPUMemoryOnNode {
return false, nil, fmt.Errorf("no enough gpu memory on single device")
}
}
return true, nil, nil
}
Loading

0 comments on commit 944a364

Please sign in to comment.