Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gpu sharing in predicate logic. #852

Merged
merged 5 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rules:
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods", "pods/status"]
verbs: ["create", "get", "list", "watch", "update", "bind", "updateStatus", "delete"]
verbs: ["create", "get", "list", "watch", "update", "patch", "bind", "updateStatus", "delete"]
- apiGroups: [""]
resources: ["pods/binding"]
verbs: ["create"]
Expand Down
2 changes: 1 addition & 1 deletion installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ rules:
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods", "pods/status"]
verbs: ["create", "get", "list", "watch", "update", "bind", "updateStatus", "delete"]
verbs: ["create", "get", "list", "watch", "update", "patch", "bind", "updateStatus", "delete"]
- apiGroups: [""]
resources: ["pods/binding"]
verbs: ["create"]
Expand Down
72 changes: 72 additions & 0 deletions pkg/scheduler/api/device_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2020 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
v1 "k8s.io/api/core/v1"
)

// GPUDevice include gpu id, memory and the pods that are sharing it.
type GPUDevice struct {
// GPU ID
ID int
// The pods that are sharing this GPU
PodMap map[string]*v1.Pod
// memory per card
Memory uint
}

// NewGPUDevice creates a device
func NewGPUDevice(id int, mem uint) *GPUDevice {
return &GPUDevice{
ID: id,
Memory: mem,
PodMap: map[string]*v1.Pod{},
}
}

// getUsedGPUMemory calculates the used memory of the device.
func (g *GPUDevice) getUsedGPUMemory() uint {
res := uint(0)
for _, pod := range g.PodMap {
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
continue
} else {
gpuRequest := GetGPUResourceOfPod(pod)
res += gpuRequest
}
}
return res
}

// GetGPUResourceOfPod returns the GPU resource required by the pod.
func GetGPUResourceOfPod(pod *v1.Pod) uint {
var mem uint
for _, container := range pod.Spec.Containers {
mem += getGPUResourceOfContainer(&container)
}
return mem
}

// getGPUResourceOfPod returns the GPU resource required by the container.
func getGPUResourceOfContainer(container *v1.Container) uint {
var mem uint
if val, ok := container.Resources.Limits[VolcanoGPUResource]; ok {
mem = uint(val.Value())
}
return mem
}
135 changes: 105 additions & 30 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type NodeInfo struct {
Tasks map[TaskID]*TaskInfo

// Used to store custom information
Others map[string]interface{}
Others map[string]interface{}
GPUDevices map[int]*GPUDevice
hzxuzhonghu marked this conversation as resolved.
Show resolved Hide resolved
}

// FutureIdle returns resources that will be idle in the future:
Expand All @@ -65,40 +66,31 @@ type NodeState struct {

// NewNodeInfo is used to create new nodeInfo object
func NewNodeInfo(node *v1.Node) *NodeInfo {
var ni *NodeInfo
nodeinfo := &NodeInfo{
Releasing: EmptyResource(),
Pipelined: EmptyResource(),
Idle: EmptyResource(),
Used: EmptyResource(),

if node == nil {
ni = &NodeInfo{
Releasing: EmptyResource(),
Pipelined: EmptyResource(),
Idle: EmptyResource(),
Used: EmptyResource(),

Allocatable: EmptyResource(),
Capability: EmptyResource(),

Tasks: make(map[TaskID]*TaskInfo),
}
} else {
ni = &NodeInfo{
Name: node.Name,
Node: node,

Releasing: EmptyResource(),
Pipelined: EmptyResource(),
Idle: NewResource(node.Status.Allocatable),
Used: EmptyResource(),
Allocatable: EmptyResource(),
Capability: EmptyResource(),

Allocatable: NewResource(node.Status.Allocatable),
Capability: NewResource(node.Status.Capacity),
Tasks: make(map[TaskID]*TaskInfo),

Tasks: make(map[TaskID]*TaskInfo),
}
GPUDevices: make(map[int]*GPUDevice),
}

ni.setNodeState(node)
if node != nil {
nodeinfo.Name = node.Name
nodeinfo.Node = node
nodeinfo.Idle = NewResource(node.Status.Allocatable)
nodeinfo.Allocatable = NewResource(node.Status.Allocatable)
nodeinfo.Capability = NewResource(node.Status.Capacity)
}
nodeinfo.setNodeGPUInfo(node)
nodeinfo.setNodeState(node)

return ni
return nodeinfo
}

// Clone used to clone nodeInfo Object
Expand All @@ -108,7 +100,6 @@ func (ni *NodeInfo) Clone() *NodeInfo {
for _, p := range ni.Tasks {
res.AddTask(p)
}
res.Others = ni.Others
return res
}

Expand Down Expand Up @@ -154,9 +145,36 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) {
}
}

func (ni *NodeInfo) setNodeGPUInfo(node *v1.Node) {
if node == nil {
return
}
memory, ok := node.Status.Capacity[VolcanoGPUResource]
if !ok {
return
}
totalMemory := memory.Value()

res, ok := node.Status.Capacity[VolcanoGPUNumber]
if !ok {
return
}
gpuNumber := res.Value()
if gpuNumber == 0 {
klog.Warningf("invalid %s=%s", VolcanoGPUNumber, res.String())
return
}

memoryPerCard := uint(totalMemory / gpuNumber)
hzxuzhonghu marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < int(gpuNumber); i++ {
ni.GPUDevices[i] = NewGPUDevice(i, memoryPerCard)
}
}

// SetNode sets kubernetes node object to nodeInfo object
func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.setNodeState(node)
ni.setNodeGPUInfo(node)

if !ni.Ready() {
klog.Warningf("Failed to set node info, phase: %s, reason: %s",
Expand All @@ -180,11 +198,13 @@ func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.Idle.Sub(ti.Resreq)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
ni.Idle.Sub(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
}
}
}
Expand Down Expand Up @@ -225,13 +245,15 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
}
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
if err := ni.allocateIdleResource(ti); err != nil {
return err
}
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
}
}

Expand Down Expand Up @@ -261,11 +283,13 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
ni.Releasing.Sub(task.Resreq)
ni.Idle.Add(task.Resreq)
ni.Used.Sub(task.Resreq)
ni.AddGPUResource(ti.Pod)
case Pipelined:
ni.Pipelined.Sub(task.Resreq)
default:
ni.Idle.Add(task.Resreq)
ni.Used.Sub(task.Resreq)
ni.SubGPUResource(ti.Pod)
}
}

Expand Down Expand Up @@ -314,3 +338,54 @@ func (ni *NodeInfo) Pods() (pods []*v1.Pod) {

return
}

// GetDevicesIdleGPUMemory returns all the idle GPU memory by gpu card.
func (ni *NodeInfo) GetDevicesIdleGPUMemory() map[int]uint {
devicesAllGPUMemory := ni.getDevicesAllGPUMemory()
devicesUsedGPUMemory := ni.getDevicesUsedGPUMemory()
res := map[int]uint{}
for id, allMemory := range devicesAllGPUMemory {
if usedMemory, found := devicesUsedGPUMemory[id]; found {
res[id] = allMemory - usedMemory
}
}
return res
}

func (ni *NodeInfo) getDevicesUsedGPUMemory() map[int]uint {
res := map[int]uint{}
for _, device := range ni.GPUDevices {
res[device.ID] = device.getUsedGPUMemory()
}
return res
}

func (ni *NodeInfo) getDevicesAllGPUMemory() map[int]uint {
res := map[int]uint{}
for _, device := range ni.GPUDevices {
res[device.ID] = device.Memory
}
return res
}

// AddGPUResource adds the pod to GPU pool if it is assigned
func (ni *NodeInfo) AddGPUResource(pod *v1.Pod) {
gpuRes := GetGPUResourceOfPod(pod)
if gpuRes > 0 {
id := GetGPUIndex(pod)
if dev := ni.GPUDevices[id]; dev != nil {
dev.PodMap[string(pod.UID)] = pod
}
}
}

// SubGPUResource frees the gpu hold by the pod
func (ni *NodeInfo) SubGPUResource(pod *v1.Pod) {
gpuRes := GetGPUResourceOfPod(pod)
if gpuRes > 0 {
id := GetGPUIndex(pod)
if dev := ni.GPUDevices[id]; dev != nil {
delete(dev.PodMap, string(pod.UID))
}
}
}
3 changes: 3 additions & 0 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
},
GPUDevices: make(map[int]*GPUDevice),
},
},
{
Expand All @@ -79,6 +80,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
Capability: buildResource("2000m", "1G"),
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
GPUDevices: make(map[int]*GPUDevice),
},
expectedFailure: true,
},
Expand Down Expand Up @@ -138,6 +140,7 @@ func TestNodeInfo_RemovePod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p3": NewTaskInfo(case01Pod3),
},
GPUDevices: make(map[int]*GPUDevice),
},
},
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/scheduler/api/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ limitations under the License.
package api

import (
"fmt"
"strconv"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"
)

// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest.
Expand Down Expand Up @@ -71,3 +77,41 @@ func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource {

return result
}

// GetGPUIndex returns the ID of the GPU
func GetGPUIndex(pod *v1.Pod) int {
if len(pod.Annotations) > 0 {
value, found := pod.Annotations[GPUIndex]
if found {
id, err := strconv.Atoi(value)
if err != nil {
klog.Errorf("invalid %s=%s", GPUIndex, value)
return -1
}
return id
}
}

return -1
}

func escapeJSONPointer(p string) string {
// Escaping reference name using https://tools.ietf.org/html/rfc6901
p = strings.Replace(p, "~", "~0", -1)
p = strings.Replace(p, "/", "~1", -1)
return p
}

// AddGPUIndexPatch returns the patch adding GPU index
func AddGPUIndexPatch(id int) string {
return fmt.Sprintf(`[{"op": "add", "path": "/metadata/annotations/%s", "value":"%d"},`+
`{"op": "add", "path": "/metadata/annotations/%s", "value": "%d"}]`,
escapeJSONPointer(PredicateTime), time.Now().UnixNano(),
escapeJSONPointer(GPUIndex), id)
}

// RemoveGPUIndexPatch returns the patch removing GPU index
func RemoveGPUIndexPatch() string {
return fmt.Sprintf(`[{"op": "remove", "path": "/metadata/annotations/%s"},`+
`{"op": "remove", "path": "/metadata/annotations/%s"]`, escapeJSONPointer(PredicateTime), escapeJSONPointer(GPUIndex))
}
Loading