From 38cec5c2825ad88e2db5da57c01343718aabb464 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Tue, 23 Jun 2020 15:29:18 +0800 Subject: [PATCH] Continue working on gpu sharing --- .../chart/volcano/templates/scheduler.yaml | 2 +- installer/volcano-development.yaml | 2 +- pkg/scheduler/api/device_info.go | 67 +++--- pkg/scheduler/api/node_info.go | 194 +++++++----------- pkg/scheduler/api/pod_info.go | 43 +++- pkg/scheduler/api/well_known_labels.go | 30 +++ pkg/scheduler/framework/session.go | 5 + pkg/scheduler/plugins/predicates/gpu.go | 56 +++++ .../plugins/predicates/predicates.go | 116 +++++------ 9 files changed, 290 insertions(+), 225 deletions(-) create mode 100644 pkg/scheduler/api/well_known_labels.go create mode 100644 pkg/scheduler/plugins/predicates/gpu.go diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index 26252969a8..b810442d9d 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -31,7 +31,7 @@ rules: verbs: ["create", "list", "watch", "update", "patch"] - apiGroups: [""] resources: ["pods", "pods/status"] - verbs: ["create", "get", "list", "watch", "update", "bind", "updateStatus", "delete"] + verbs: ["create", "get", "list", "watch", "update", "patch", "bind", "updateStatus", "delete"] - apiGroups: [""] resources: ["pods/binding"] verbs: ["create"] diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index a030715211..83b07d095a 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -50,7 +50,7 @@ rules: verbs: ["create", "list", "watch", "update", "patch"] - apiGroups: [""] resources: ["pods", "pods/status"] - verbs: ["create", "get", "list", "watch", "update", "bind", "updateStatus", "delete"] + verbs: ["create", "get", "list", "watch", "update", "patch", "bind", "updateStatus", "delete"] - apiGroups: [""] resources: ["pods/binding"] verbs: ["create"] diff --git a/pkg/scheduler/api/device_info.go b/pkg/scheduler/api/device_info.go index 1c15a56390..11edae292a 100644 --- a/pkg/scheduler/api/device_info.go +++ b/pkg/scheduler/api/device_info.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Kubernetes Authors. +Copyright 2020 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,47 +17,56 @@ limitations under the License. package api import ( - "strconv" - v1 "k8s.io/api/core/v1" ) -type DeviceInfo struct { - Id int - PodMap map[string]*v1.Pod - GPUTotalMemory uint +// GPUDevice include gpu id, memory and the pods that are sharing it. +type GPUDevice struct { + // GPU ID + ID int + // The pods that are sharing this GPU + PodMap map[string]*v1.Pod + // memory per card + Memory uint } -func (di *DeviceInfo) GetPods() []*v1.Pod { - pods := []*v1.Pod{} - for _, pod := range di.PodMap { - pods = append(pods, pod) +// NewGPUDevice creates a device +func NewGPUDevice(id int, mem uint) *GPUDevice { + return &GPUDevice{ + ID: id, + Memory: mem, + PodMap: map[string]*v1.Pod{}, } - return pods } -func NewDeviceInfo(id int, mem uint) *DeviceInfo { - return &DeviceInfo{ - Id: id, - GPUTotalMemory: mem, - PodMap: map[string]*v1.Pod{}, - } -} - -func (di *DeviceInfo) GetUsedGPUMemory() uint { +// getUsedGPUMemory calculates the used memory of the device. +func (g *GPUDevice) getUsedGPUMemory() uint { res := uint(0) - for _, pod := range di.PodMap { + for _, pod := range g.PodMap { if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { continue } else { - if len(pod.ObjectMeta.Annotations) > 0 { - mem, found := pod.ObjectMeta.Annotations["volcano.sh/pod-gpu-memory"] - if found { - m, _ := strconv.Atoi(mem) - res += uint(m) - } - } + gpuRequest := GetGPUResourceOfPod(pod) + res += gpuRequest } } return res } + +// GetGPUResourceOfPod returns the GPU resource required by the pod. +func GetGPUResourceOfPod(pod *v1.Pod) uint { + var mem uint + for _, container := range pod.Spec.Containers { + mem += getGPUResourceOfContainer(&container) + } + return mem +} + +// getGPUResourceOfPod returns the GPU resource required by the container. +func getGPUResourceOfContainer(container *v1.Container) uint { + var mem uint + if val, ok := container.Resources.Limits[VolcanoGPUResource]; ok { + mem = uint(val.Value()) + } + return mem +} diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index a489553daa..7737a66ae0 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -18,7 +18,6 @@ package api import ( "fmt" - "strconv" v1 "k8s.io/api/core/v1" "k8s.io/klog" @@ -47,12 +46,9 @@ type NodeInfo struct { Tasks map[TaskID]*TaskInfo - Devices map[int]*DeviceInfo - GPUTotalCore int - GPUTotalMemory int - // Used to store custom information - Others map[string]interface{} + Others map[string]interface{} + GPUDevices map[int]*GPUDevice } // FutureIdle returns resources that will be idle in the future: @@ -70,49 +66,31 @@ type NodeState struct { // NewNodeInfo is used to create new nodeInfo object func NewNodeInfo(node *v1.Node) *NodeInfo { - var ni *NodeInfo - - if node == nil { - ni = &NodeInfo{ - Releasing: EmptyResource(), - Pipelined: EmptyResource(), - Idle: EmptyResource(), - Used: EmptyResource(), - - Allocatable: EmptyResource(), - Capability: EmptyResource(), - - Tasks: make(map[TaskID]*TaskInfo), + nodeinfo := &NodeInfo{ + Releasing: EmptyResource(), + Pipelined: EmptyResource(), + Idle: EmptyResource(), + Used: EmptyResource(), - Devices: make(map[int]*DeviceInfo), - GPUTotalCore: 0, - GPUTotalMemory: 0, - } - } else { - ni = &NodeInfo{ - Name: node.Name, - Node: node, - - Releasing: EmptyResource(), - Pipelined: EmptyResource(), - Idle: NewResource(node.Status.Allocatable), - Used: EmptyResource(), - - Allocatable: NewResource(node.Status.Allocatable), - Capability: NewResource(node.Status.Capacity), + Allocatable: EmptyResource(), + Capability: EmptyResource(), - Tasks: make(map[TaskID]*TaskInfo), + Tasks: make(map[TaskID]*TaskInfo), - Devices: make(map[int]*DeviceInfo), - GPUTotalCore: 0, - GPUTotalMemory: 0, - } + GPUDevices: make(map[int]*GPUDevice), } - ni.SetNodeGPUInfo(node) - ni.setNodeState(node) + if node != nil { + nodeinfo.Name = node.Name + nodeinfo.Node = node + nodeinfo.Idle = NewResource(node.Status.Allocatable) + nodeinfo.Allocatable = NewResource(node.Status.Allocatable) + nodeinfo.Capability = NewResource(node.Status.Capacity) + } + nodeinfo.setNodeGPUInfo(node) + nodeinfo.setNodeState(node) - return ni + return nodeinfo } // Clone used to clone nodeInfo Object @@ -122,7 +100,6 @@ func (ni *NodeInfo) Clone() *NodeInfo { for _, p := range ni.Tasks { res.AddTask(p) } - res.Others = ni.Others return res } @@ -168,27 +145,35 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) { } } -func (ni *NodeInfo) SetNodeGPUInfo(node *v1.Node) { - - core, ok := node.Status.Capacity["volcano.sh/node-gpu-core"] - if ok { - ni.GPUTotalCore = int(core.Value()) +func (ni *NodeInfo) setNodeGPUInfo(node *v1.Node) { + if node == nil { + return } - - mem, ok := node.Status.Capacity["volcano.sh/node-gpu-memory"] - if ok { - ni.GPUTotalMemory = int(mem.Value()) + memory, ok := node.Status.Capacity[VolcanoGPUResource] + if !ok { + return } + totalMemory := memory.Value() - for i := 0; i < int(core.Value()); i++ { - ni.Devices[i] = NewDeviceInfo(i, uint(int(mem.Value())/int(core.Value()))) + res, ok := node.Status.Capacity[VolcanoGPUNumber] + if !ok { + return + } + gpuNumber := res.Value() + if gpuNumber == 0 { + klog.Warningf("invalid %s=%s", VolcanoGPUNumber, res) } + memoryPerCard := uint(totalMemory / gpuNumber) + for i := 0; i < int(gpuNumber); i++ { + ni.GPUDevices[i] = NewGPUDevice(i, memoryPerCard) + } } // SetNode sets kubernetes node object to nodeInfo object func (ni *NodeInfo) SetNode(node *v1.Node) { ni.setNodeState(node) + ni.setNodeGPUInfo(node) if !ni.Ready() { klog.Warningf("Failed to set node info, phase: %s, reason: %s", @@ -212,11 +197,13 @@ func (ni *NodeInfo) SetNode(node *v1.Node) { ni.Idle.Sub(ti.Resreq) ni.Releasing.Add(ti.Resreq) ni.Used.Add(ti.Resreq) + ni.AddGPUResource(ti.Pod) case Pipelined: ni.Pipelined.Add(ti.Resreq) default: ni.Idle.Sub(ti.Resreq) ni.Used.Add(ti.Resreq) + ni.AddGPUResource(ti.Pod) } } } @@ -257,6 +244,7 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error { } ni.Releasing.Add(ti.Resreq) ni.Used.Add(ti.Resreq) + ni.AddGPUResource(ti.Pod) case Pipelined: ni.Pipelined.Add(ti.Resreq) default: @@ -264,6 +252,7 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error { return err } ni.Used.Add(ti.Resreq) + ni.AddGPUResource(ti.Pod) } } @@ -293,11 +282,13 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error { ni.Releasing.Sub(task.Resreq) ni.Idle.Add(task.Resreq) ni.Used.Sub(task.Resreq) + ni.AddGPUResource(ti.Pod) case Pipelined: ni.Pipelined.Sub(task.Resreq) default: ni.Idle.Add(task.Resreq) ni.Used.Sub(task.Resreq) + ni.SubGPUResource(ti.Pod) } } @@ -347,38 +338,10 @@ func (ni *NodeInfo) Pods() (pods []*v1.Pod) { return } -func (ni *NodeInfo) CheckPredicatePodOnGPUNode(pod *v1.Pod) bool { - res := false - memReq := uint(0) - - remainMems := ni.GetDevicesRemainGPUMemory() - if len(pod.ObjectMeta.Annotations) > 0 { - mem, found := pod.ObjectMeta.Annotations["volcano.sh/pod-gpu-memory"] - if found { - m, _ := strconv.Atoi(mem) - memReq = uint(m) - } - } - - if len(remainMems) > 0 { - for devID := 0; devID < len(ni.Devices); devID++ { - availableGPU, ok := remainMems[devID] - if ok { - if availableGPU >= memReq { - res = true - break - } - } - } - } - - return res - -} - -func (ni *NodeInfo) GetDevicesRemainGPUMemory() map[int]uint { - devicesAllGPUMemory := ni.GetDevicesAllGPUMemory() - devicesUsedGPUMemory := ni.GetDevicesUsedGPUMemory() +// GetDevicesIdleGPUMemory returns all the idle GPU memory by gpu card. +func (ni *NodeInfo) GetDevicesIdleGPUMemory() map[int]uint { + devicesAllGPUMemory := ni.getDevicesAllGPUMemory() + devicesUsedGPUMemory := ni.getDevicesUsedGPUMemory() res := map[int]uint{} for id, allMemory := range devicesAllGPUMemory { if usedMemory, found := devicesUsedGPUMemory[id]; found { @@ -388,55 +351,40 @@ func (ni *NodeInfo) GetDevicesRemainGPUMemory() map[int]uint { return res } -func (ni *NodeInfo) GetDevicesUsedGPUMemory() map[int]uint { +func (ni *NodeInfo) getDevicesUsedGPUMemory() map[int]uint { res := map[int]uint{} - for _, device := range ni.Devices { - res[device.Id] = device.GetUsedGPUMemory() + for _, device := range ni.GPUDevices { + res[device.ID] = device.getUsedGPUMemory() } return res } -func (ni *NodeInfo) GetDevicesAllGPUMemory() map[int]uint { +func (ni *NodeInfo) getDevicesAllGPUMemory() map[int]uint { res := map[int]uint{} - for _, device := range ni.Devices { - res[device.Id] = device.GPUTotalMemory + for _, device := range ni.GPUDevices { + res[device.ID] = device.Memory } return res } -func (ni *NodeInfo) GetDeviceCoreId(pod *v1.Pod) (int, bool) { - - resId := -1 - resOk := false - cMem := uint(0) - rMems := ni.GetDevicesRemainGPUMemory() - - gpuReq := uint(0) - - if len(pod.ObjectMeta.Annotations) > 0 { - req, ok := pod.ObjectMeta.Annotations["volcano.sh/pod-gpu-memory"] - if ok { - s, _ := strconv.Atoi(req) - gpuReq = uint(s) +// AddGPUResource adds the pod to GPU pool if it is assigned +func (ni *NodeInfo) AddGPUResource(pod *v1.Pod) { + gpuRes := GetGPUResourceOfPod(pod) + if gpuRes > 0 { + id := GetGPUIndex(pod) + if dev := ni.GPUDevices[id]; dev != nil { + dev.PodMap[string(pod.UID)] = pod } } +} - if gpuReq > uint(0) { - if len(rMems) > 0 { - for i := 0; i < len(ni.Devices); i++ { - rMem, ok := rMems[i] - if ok { - if rMem >= gpuReq { - if resId == -1 || cMem > rMem { - resId = i - cMem = rMem - } - resOk = true - } - } - } +// SubGPUResource frees the gpu hold by the pod +func (ni *NodeInfo) SubGPUResource(pod *v1.Pod) { + gpuRes := GetGPUResourceOfPod(pod) + if gpuRes > 0 { + id := GetGPUIndex(pod) + if dev := ni.GPUDevices[id]; dev != nil { + delete(dev.PodMap, string(pod.UID)) } } - - return resId, resOk } diff --git a/pkg/scheduler/api/pod_info.go b/pkg/scheduler/api/pod_info.go index 0a0e68ad41..56405ecc77 100644 --- a/pkg/scheduler/api/pod_info.go +++ b/pkg/scheduler/api/pod_info.go @@ -18,8 +18,12 @@ package api import ( "fmt" + "strconv" + "strings" + "time" v1 "k8s.io/api/core/v1" + "k8s.io/klog" ) // Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest. @@ -74,13 +78,40 @@ func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource { return result } -func UpdateGPUPod(oldPod *v1.Pod, coreId int, memoryPerCore int) (newPod *v1.Pod) { - newPod = oldPod.DeepCopy() - if len(newPod.ObjectMeta.Annotations) == 0 { - newPod.ObjectMeta.Annotations = map[string]string{} +// GetGPUIndex returns the ID of the GPU +func GetGPUIndex(pod *v1.Pod) int { + if len(pod.Annotations) > 0 { + value, found := pod.Annotations[GPUIndex] + if found { + id, err := strconv.Atoi(value) + if err != nil { + klog.Error("invalid %s=%s", GPUIndex, value) + return -1 + } + return id + } } - newPod.ObjectMeta.Annotations["volcano.sh/gpu-core-id"] = fmt.Sprintf("%d", coreId) + return -1 +} + +func escapeJSONPointer(p string) string { + // Escaping reference name using https://tools.ietf.org/html/rfc6901 + p = strings.Replace(p, "~", "~0", -1) + p = strings.Replace(p, "/", "~1", -1) + return p +} + +// AddGPUIndexPatch returns the patch adding GPU index +func AddGPUIndexPatch(id int) string { + return fmt.Sprintf(`[{"op": "add", "path": "/metadata/annotations/%s", "value":"%d"},`+ + `{"op": "add", "path": "/metadata/annotations/%s", "value": "%d"}]`, + escapeJSONPointer(PredicateTime), time.Now().UnixNano(), + escapeJSONPointer(GPUIndex), id) +} - return newPod +// RemoveGPUIndexPatch returns the patch removing GPU index +func RemoveGPUIndexPatch() string { + return fmt.Sprintf(`[{"op": "remove", "path": "/metadata/annotations/%s"},`+ + `{"op": "remove", "path": "/metadata/annotations/%s"]`, escapeJSONPointer(PredicateTime), escapeJSONPointer(GPUIndex)) } diff --git a/pkg/scheduler/api/well_known_labels.go b/pkg/scheduler/api/well_known_labels.go new file mode 100644 index 0000000000..c412d24529 --- /dev/null +++ b/pkg/scheduler/api/well_known_labels.go @@ -0,0 +1,30 @@ +/* +Copyright 2020 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +const ( + + // VolcanoGPUResource extended gpu resource + VolcanoGPUResource = "volcano.sh/gpu-memory" + // VolcanoGPUNumber virtual GPU card number + VolcanoGPUNumber = "volcano.sh/gpu-number" + + // PredicateTime is the key of predicate time + PredicateTime = "volcano.sh/predicate-time" + // GPUIndex is the key of gpu index + GPUIndex = "volcano.sh/gpu-index" +) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index b67f9b1f01..ac0e5c977f 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -401,6 +401,11 @@ func (ssn *Session) AddEventHandler(eh *EventHandler) { ssn.eventHandlers = append(ssn.eventHandlers, eh) } +// KubeClient returns the kubernetes client +func (ssn Session) KubeClient() kubernetes.Interface { + return ssn.kubeClient +} + //String return nodes and jobs information in the session func (ssn Session) String() string { msg := fmt.Sprintf("Session %v: \n", ssn.UID) diff --git a/pkg/scheduler/plugins/predicates/gpu.go b/pkg/scheduler/plugins/predicates/gpu.go new file mode 100644 index 0000000000..cd28ac0db4 --- /dev/null +++ b/pkg/scheduler/plugins/predicates/gpu.go @@ -0,0 +1,56 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + + "volcano.sh/volcano/pkg/scheduler/api" +) + +// checkNodeGPUSharingPredicate checks if a gpu sharing pod can be scheduled on a node. +func checkNodeGPUSharingPredicate(pod *v1.Pod, nodeInfo *api.NodeInfo) (bool, error) { + // no gpu sharing request + if api.GetGPUResourceOfPod(pod) <= 0 { + return true, nil + } + + id := predicateGPU(pod, nodeInfo) + if id < 0 { + return false, fmt.Errorf("no enough gpu memory on single device of node %s", nodeInfo.Name) + } + return true, nil +} + +// predicateGPU returns the available GPU ID +func predicateGPU(pod *v1.Pod, node *api.NodeInfo) int { + gpuRequest := api.GetGPUResourceOfPod(pod) + allocatableGPUs := node.GetDevicesIdleGPUMemory() + + for devID := 0; devID < len(allocatableGPUs); devID++ { + availableGPU, ok := allocatableGPUs[devID] + if ok { + if availableGPU >= gpuRequest { + return devID + } + } + } + + return -1 +} diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index d0988eddab..ac8c524432 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -21,9 +21,8 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" @@ -100,7 +99,7 @@ func enablePredicate(args framework.Arguments) predicateEnable { memoryPressureEnable: false, diskPressureEnable: false, pidPressureEnable: false, - gpuSharingEnable: false, + gpuSharingEnable: true, // enable for debug } // Checks whether predicate.MemoryPressureEnable is provided or not, if given, modifies the value in predicateEnable struct. @@ -125,9 +124,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { predicate := enablePredicate(pp.pluginArguments) - //TODO: (tizhou86) zhonghu will pass the client set into this function - clientSet := NewClientSetOrClientSetInstance() - + kubeClient := ssn.KubeClient() // Register event handlers to update task info in PodLister & nodeMap ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { @@ -135,58 +132,62 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { nodeName := event.Task.NodeName node, found := nodeMap[nodeName] - nodeInfo, foundInfo := ssn.Nodes[nodeName] - - if predicate.gpuSharingEnable { - if !foundInfo { - klog.Warningf("predicates with gpu sharing, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) - } else { - coreId, found := nodeInfo.GetDeviceCoreId(pod) - if found { - var err error - pod, err = clientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) - newPod := api.UpdateGPUPod(pod, coreId, nodeInfo.GPUTotalMemory/nodeInfo.GPUTotalCore) - _, err = clientSet.CoreV1().Pods(newPod.Namespace).Update(newPod) - } else { - klog.Errorf("The node %s can't place the pod %s in ns %s", pod.Spec.NodeName, pod.Name, pod.Namespace) - } - - dev, found := nodeInfo.Devices[coreId] - if !found { - - } else { - dev.PodMap[newPod.UID] = newPod - node.AddPod(pod) - } - - klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + if !found { + klog.Errorf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + return + } + + if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 { + nodeInfo, _ := ssn.Nodes[nodeName] + id := predicateGPU(pod, nodeInfo) + if id < 0 { + klog.Errorf("The node %s can't place the pod %s in ns %s", pod.Spec.NodeName, pod.Name, pod.Namespace) + return } - } else { - if !found { - klog.Warningf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) - } else { - node.AddPod(pod) - klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + patch := api.AddGPUIndexPatch(id) + pod, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err) + return } + dev, _ := nodeInfo.GPUDevices[id] + dev.PodMap[string(pod.UID)] = pod + klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) } + node.AddPod(pod) + klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) }, DeallocateFunc: func(event *framework.Event) { pod := pl.UpdateTask(event.Task, "") - nodeName := event.Task.NodeName node, found := nodeMap[nodeName] + if !found { + klog.Errorf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + return + } + + if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 { + // deallocate pod gpu id + id := api.GetGPUIndex(pod) + patch := api.RemoveGPUIndexPatch() + _, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err) + return + } - if predicate.gpuSharingEnable { - //TODO: (tizhou86) add deallocate logic - } else { - if !found { - klog.Warningf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) - } else { - node.RemovePod(pod) - klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + nodeInfo, _ := ssn.Nodes[nodeName] + if dev, ok := nodeInfo.GPUDevices[id]; ok { + delete(dev.PodMap, string(pod.UID)) } + + klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) } + + node.RemovePod(pod) + klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + }, }) @@ -258,16 +259,15 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message()) } - // if predicate.gpuSharingEnable - if true { + if predicate.gpuSharingEnable { // CheckGPUSharingPredicate - fit, err := CheckNodeGPUSharingPredicate(task.Pod, node) + fit, err := checkNodeGPUSharingPredicate(task.Pod, node) if err != nil { return err } - klog.V(4).Infof("CheckNodeGPUSharingPredicate predicates Task <%s/%s> on Node <%s>: fit %t, err %v", - task.Namespace, task.Name, node.Name, fit, err) + klog.V(4).Infof("checkNodeGPUSharingPredicate predicates Task <%s/%s> on Node <%s>: fit %v", + task.Namespace, task.Name, node.Name, fit) } return nil @@ -275,17 +275,3 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } func (pp *predicatesPlugin) OnSessionClose(ssn *framework.Session) {} - -// CheckNodeGPUSharingPredicate checks if a gpu sharing pod can be scheduled on a node. -func CheckNodeGPUSharingPredicate(pod *v1.Pod, nodeInfo *api.NodeInfo) (bool, error) { - _, ok := nodeInfo.Node.Status.Capacity["volcano.sh/node-gpu-core"] - if !ok { - return false, fmt.Errorf("node is not gpu sharing") - } else { - isEnoughGPUMemoryOnNode := nodeInfo.CheckPredicatePodOnGPUNode(pod) - if !isEnoughGPUMemoryOnNode { - return false, fmt.Errorf("no enough gpu memory on single device") - } - } - return true, nil -}