Skip to content

Commit

Permalink
Merge pull request #852 from tizhou86/gpu_sharing
Browse files Browse the repository at this point in the history
Add gpu sharing in predicate logic.
  • Loading branch information
volcano-sh-bot authored Jul 7, 2020
2 parents d1cc54b + c8052c4 commit a5f6672
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 66 deletions.
2 changes: 1 addition & 1 deletion installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rules:
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods", "pods/status"]
verbs: ["create", "get", "list", "watch", "update", "bind", "updateStatus", "delete"]
verbs: ["create", "get", "list", "watch", "update", "patch", "bind", "updateStatus", "delete"]
- apiGroups: [""]
resources: ["pods/binding"]
verbs: ["create"]
Expand Down
2 changes: 1 addition & 1 deletion installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ rules:
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods", "pods/status"]
verbs: ["create", "get", "list", "watch", "update", "bind", "updateStatus", "delete"]
verbs: ["create", "get", "list", "watch", "update", "patch", "bind", "updateStatus", "delete"]
- apiGroups: [""]
resources: ["pods/binding"]
verbs: ["create"]
Expand Down
72 changes: 72 additions & 0 deletions pkg/scheduler/api/device_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2020 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
v1 "k8s.io/api/core/v1"
)

// GPUDevice include gpu id, memory and the pods that are sharing it.
type GPUDevice struct {
// GPU ID
ID int
// The pods that are sharing this GPU
PodMap map[string]*v1.Pod
// memory per card
Memory uint
}

// NewGPUDevice creates a device
func NewGPUDevice(id int, mem uint) *GPUDevice {
return &GPUDevice{
ID: id,
Memory: mem,
PodMap: map[string]*v1.Pod{},
}
}

// getUsedGPUMemory calculates the used memory of the device.
func (g *GPUDevice) getUsedGPUMemory() uint {
res := uint(0)
for _, pod := range g.PodMap {
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
continue
} else {
gpuRequest := GetGPUResourceOfPod(pod)
res += gpuRequest
}
}
return res
}

// GetGPUResourceOfPod returns the GPU resource required by the pod.
func GetGPUResourceOfPod(pod *v1.Pod) uint {
var mem uint
for _, container := range pod.Spec.Containers {
mem += getGPUResourceOfContainer(&container)
}
return mem
}

// getGPUResourceOfPod returns the GPU resource required by the container.
func getGPUResourceOfContainer(container *v1.Container) uint {
var mem uint
if val, ok := container.Resources.Limits[VolcanoGPUResource]; ok {
mem = uint(val.Value())
}
return mem
}
135 changes: 105 additions & 30 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type NodeInfo struct {
Tasks map[TaskID]*TaskInfo

// Used to store custom information
Others map[string]interface{}
Others map[string]interface{}
GPUDevices map[int]*GPUDevice
}

// FutureIdle returns resources that will be idle in the future:
Expand All @@ -65,40 +66,31 @@ type NodeState struct {

// NewNodeInfo is used to create new nodeInfo object
func NewNodeInfo(node *v1.Node) *NodeInfo {
var ni *NodeInfo
nodeinfo := &NodeInfo{
Releasing: EmptyResource(),
Pipelined: EmptyResource(),
Idle: EmptyResource(),
Used: EmptyResource(),

if node == nil {
ni = &NodeInfo{
Releasing: EmptyResource(),
Pipelined: EmptyResource(),
Idle: EmptyResource(),
Used: EmptyResource(),

Allocatable: EmptyResource(),
Capability: EmptyResource(),

Tasks: make(map[TaskID]*TaskInfo),
}
} else {
ni = &NodeInfo{
Name: node.Name,
Node: node,

Releasing: EmptyResource(),
Pipelined: EmptyResource(),
Idle: NewResource(node.Status.Allocatable),
Used: EmptyResource(),
Allocatable: EmptyResource(),
Capability: EmptyResource(),

Allocatable: NewResource(node.Status.Allocatable),
Capability: NewResource(node.Status.Capacity),
Tasks: make(map[TaskID]*TaskInfo),

Tasks: make(map[TaskID]*TaskInfo),
}
GPUDevices: make(map[int]*GPUDevice),
}

ni.setNodeState(node)
if node != nil {
nodeinfo.Name = node.Name
nodeinfo.Node = node
nodeinfo.Idle = NewResource(node.Status.Allocatable)
nodeinfo.Allocatable = NewResource(node.Status.Allocatable)
nodeinfo.Capability = NewResource(node.Status.Capacity)
}
nodeinfo.setNodeGPUInfo(node)
nodeinfo.setNodeState(node)

return ni
return nodeinfo
}

// Clone used to clone nodeInfo Object
Expand All @@ -108,7 +100,6 @@ func (ni *NodeInfo) Clone() *NodeInfo {
for _, p := range ni.Tasks {
res.AddTask(p)
}
res.Others = ni.Others
return res
}

Expand Down Expand Up @@ -154,9 +145,36 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) {
}
}

func (ni *NodeInfo) setNodeGPUInfo(node *v1.Node) {
if node == nil {
return
}
memory, ok := node.Status.Capacity[VolcanoGPUResource]
if !ok {
return
}
totalMemory := memory.Value()

res, ok := node.Status.Capacity[VolcanoGPUNumber]
if !ok {
return
}
gpuNumber := res.Value()
if gpuNumber == 0 {
klog.Warningf("invalid %s=%s", VolcanoGPUNumber, res.String())
return
}

memoryPerCard := uint(totalMemory / gpuNumber)
for i := 0; i < int(gpuNumber); i++ {
ni.GPUDevices[i] = NewGPUDevice(i, memoryPerCard)
}
}

// SetNode sets kubernetes node object to nodeInfo object
func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.setNodeState(node)
ni.setNodeGPUInfo(node)

if !ni.Ready() {
klog.Warningf("Failed to set node info, phase: %s, reason: %s",
Expand All @@ -180,11 +198,13 @@ func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.Idle.Sub(ti.Resreq)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
ni.Idle.Sub(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
}
}
}
Expand Down Expand Up @@ -225,13 +245,15 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
}
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
if err := ni.allocateIdleResource(ti); err != nil {
return err
}
ni.Used.Add(ti.Resreq)
ni.AddGPUResource(ti.Pod)
}
}

Expand Down Expand Up @@ -261,11 +283,13 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
ni.Releasing.Sub(task.Resreq)
ni.Idle.Add(task.Resreq)
ni.Used.Sub(task.Resreq)
ni.AddGPUResource(ti.Pod)
case Pipelined:
ni.Pipelined.Sub(task.Resreq)
default:
ni.Idle.Add(task.Resreq)
ni.Used.Sub(task.Resreq)
ni.SubGPUResource(ti.Pod)
}
}

Expand Down Expand Up @@ -314,3 +338,54 @@ func (ni *NodeInfo) Pods() (pods []*v1.Pod) {

return
}

// GetDevicesIdleGPUMemory returns all the idle GPU memory by gpu card.
func (ni *NodeInfo) GetDevicesIdleGPUMemory() map[int]uint {
devicesAllGPUMemory := ni.getDevicesAllGPUMemory()
devicesUsedGPUMemory := ni.getDevicesUsedGPUMemory()
res := map[int]uint{}
for id, allMemory := range devicesAllGPUMemory {
if usedMemory, found := devicesUsedGPUMemory[id]; found {
res[id] = allMemory - usedMemory
}
}
return res
}

func (ni *NodeInfo) getDevicesUsedGPUMemory() map[int]uint {
res := map[int]uint{}
for _, device := range ni.GPUDevices {
res[device.ID] = device.getUsedGPUMemory()
}
return res
}

func (ni *NodeInfo) getDevicesAllGPUMemory() map[int]uint {
res := map[int]uint{}
for _, device := range ni.GPUDevices {
res[device.ID] = device.Memory
}
return res
}

// AddGPUResource adds the pod to GPU pool if it is assigned
func (ni *NodeInfo) AddGPUResource(pod *v1.Pod) {
gpuRes := GetGPUResourceOfPod(pod)
if gpuRes > 0 {
id := GetGPUIndex(pod)
if dev := ni.GPUDevices[id]; dev != nil {
dev.PodMap[string(pod.UID)] = pod
}
}
}

// SubGPUResource frees the gpu hold by the pod
func (ni *NodeInfo) SubGPUResource(pod *v1.Pod) {
gpuRes := GetGPUResourceOfPod(pod)
if gpuRes > 0 {
id := GetGPUIndex(pod)
if dev := ni.GPUDevices[id]; dev != nil {
delete(dev.PodMap, string(pod.UID))
}
}
}
3 changes: 3 additions & 0 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
},
GPUDevices: make(map[int]*GPUDevice),
},
},
{
Expand All @@ -79,6 +80,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
Capability: buildResource("2000m", "1G"),
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
GPUDevices: make(map[int]*GPUDevice),
},
expectedFailure: true,
},
Expand Down Expand Up @@ -138,6 +140,7 @@ func TestNodeInfo_RemovePod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p3": NewTaskInfo(case01Pod3),
},
GPUDevices: make(map[int]*GPUDevice),
},
},
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/scheduler/api/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ limitations under the License.
package api

import (
"fmt"
"strconv"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"
)

// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest.
Expand Down Expand Up @@ -71,3 +77,41 @@ func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource {

return result
}

// GetGPUIndex returns the ID of the GPU
func GetGPUIndex(pod *v1.Pod) int {
if len(pod.Annotations) > 0 {
value, found := pod.Annotations[GPUIndex]
if found {
id, err := strconv.Atoi(value)
if err != nil {
klog.Errorf("invalid %s=%s", GPUIndex, value)
return -1
}
return id
}
}

return -1
}

func escapeJSONPointer(p string) string {
// Escaping reference name using https://tools.ietf.org/html/rfc6901
p = strings.Replace(p, "~", "~0", -1)
p = strings.Replace(p, "/", "~1", -1)
return p
}

// AddGPUIndexPatch returns the patch adding GPU index
func AddGPUIndexPatch(id int) string {
return fmt.Sprintf(`[{"op": "add", "path": "/metadata/annotations/%s", "value":"%d"},`+
`{"op": "add", "path": "/metadata/annotations/%s", "value": "%d"}]`,
escapeJSONPointer(PredicateTime), time.Now().UnixNano(),
escapeJSONPointer(GPUIndex), id)
}

// RemoveGPUIndexPatch returns the patch removing GPU index
func RemoveGPUIndexPatch() string {
return fmt.Sprintf(`[{"op": "remove", "path": "/metadata/annotations/%s"},`+
`{"op": "remove", "path": "/metadata/annotations/%s"]`, escapeJSONPointer(PredicateTime), escapeJSONPointer(GPUIndex))
}
Loading

0 comments on commit a5f6672

Please sign in to comment.