Skip to content

Commit

Permalink
Merge pull request #2371 from wpeng102/cache
Browse files Browse the repository at this point in the history
add csiNode cache for plugin
  • Loading branch information
volcano-sh-bot authored Jul 25, 2022
2 parents 8b8951f + 0b5ef86 commit 2ecf008
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ClusterInfo struct {
NamespaceInfo map[NamespaceName]*NamespaceInfo
RevocableNodes map[string]*NodeInfo
NodeList []string
CSINodesStatus map[string]*CSINodeStatusInfo
}

func (ci ClusterInfo) String() string {
Expand Down
16 changes: 16 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func (o *AllocateFailError) Error() string {
return o.Reason
}

type CSINodeStatusInfo struct {
CSINodeName string
DriverStatus map[string]bool
}

// NodeInfo is node level aggregated information.
type NodeInfo struct {
Name string
Expand Down Expand Up @@ -607,3 +612,14 @@ func (ni *NodeInfo) getUnhealthyGPUs(node *v1.Node) (unhealthyGPUs []int) {
}
return
}

func (cs *CSINodeStatusInfo) Clone() *CSINodeStatusInfo {
newcs := &CSINodeStatusInfo{
CSINodeName: cs.CSINodeName,
DriverStatus: make(map[string]bool),
}
for k, v := range cs.DriverStatus {
newcs.DriverStatus[k] = v
}
return newcs
}
14 changes: 14 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type SchedulerCache struct {
NodeList []string
defaultPriorityClass *schedulingv1.PriorityClass
defaultPriority int32
CSINodesStatus map[string]*schedulingapi.CSINodeStatusInfo

NamespaceCollection map[string]*schedulingapi.NamespaceCollection

Expand Down Expand Up @@ -415,6 +416,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
schedulerName: schedulerName,
nodeSelectorLabels: make(map[string]string),
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),

NodeList: []string{},
}
Expand Down Expand Up @@ -509,6 +511,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
sc.pvInformer = informerFactory.Core().V1().PersistentVolumes()
sc.scInformer = informerFactory.Storage().V1().StorageClasses()
sc.csiNodeInformer = informerFactory.Storage().V1().CSINodes()
sc.csiNodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddOrUpdateCSINode,
UpdateFunc: sc.UpdateCSINode,
DeleteFunc: sc.DeleteCSINode,
},
)
sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers()
sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities()

Expand Down Expand Up @@ -993,13 +1002,18 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
NamespaceInfo: make(map[schedulingapi.NamespaceName]*schedulingapi.NamespaceInfo),
RevocableNodes: make(map[string]*schedulingapi.NodeInfo),
NodeList: make([]string, len(sc.NodeList)),
CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
}

copy(snapshot.NodeList, sc.NodeList)
for _, value := range sc.Nodes {
value.RefreshNumaSchedulerInfoByCrd()
}

for _, value := range sc.CSINodesStatus {
snapshot.CSINodesStatus[value.CSINodeName] = value.Clone()
}

for _, value := range sc.Nodes {
if !value.Ready() {
continue
Expand Down
65 changes: 65 additions & 0 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
"reflect"
"strconv"

v1 "k8s.io/api/core/v1"
Expand All @@ -30,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"

sv1 "k8s.io/api/storage/v1"
nodeinfov1alpha1 "volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/scheme"
Expand Down Expand Up @@ -402,6 +404,69 @@ func (sc *SchedulerCache) DeleteNode(obj interface{}) {
}
}

func (sc *SchedulerCache) AddOrUpdateCSINode(obj interface{}) {
csiNode, ok := obj.(*sv1.CSINode)
if !ok {
return
}

var csiNodeStatus *schedulingapi.CSINodeStatusInfo
var found bool
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
// update nodeVolumeCount

if csiNodeStatus, found = sc.CSINodesStatus[csiNode.Name]; !found {
csiNodeStatus = &schedulingapi.CSINodeStatusInfo{
CSINodeName: csiNode.Name,
DriverStatus: make(map[string]bool),
}
sc.CSINodesStatus[csiNode.Name] = csiNodeStatus
}

for i := range csiNode.Spec.Drivers {
d := csiNode.Spec.Drivers[i]
csiNodeStatus.DriverStatus[d.Name] = d.Allocatable != nil && d.Allocatable.Count != nil
}
}

func (sc *SchedulerCache) UpdateCSINode(oldObj, newObj interface{}) {
oldCSINode, ok := oldObj.(*sv1.CSINode)
if !ok {
return
}
newCSINode, ok := newObj.(*sv1.CSINode)
if !ok {
return
}
if reflect.DeepEqual(oldCSINode.Spec, newCSINode.Spec) {
return
}
sc.AddOrUpdateCSINode(newObj)
}

func (sc *SchedulerCache) DeleteCSINode(obj interface{}) {
var csiNode *sv1.CSINode
switch t := obj.(type) {
case *sv1.CSINode:
csiNode = obj.(*sv1.CSINode)
case cache.DeletedFinalStateUnknown:
var ok bool
csiNode, ok = t.Obj.(*sv1.CSINode)
if !ok {
klog.Errorf("Cannot convert to *sv1.CSINode: %v", obj)
return
}
default:
klog.Errorf("Cannot convert to *sv1.CSINode: %v", obj)
return
}

sc.Mutex.Lock()
delete(sc.CSINodesStatus, csiNode.Name)
sc.Mutex.Unlock()
}

func getJobID(pg *schedulingapi.PodGroup) schedulingapi.JobID {
return schedulingapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}
Expand Down

0 comments on commit 2ecf008

Please sign in to comment.