diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 6335dfe79ecf0..7a6578bd7830d 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -689,6 +689,10 @@ func postProcessOpenAPISpecForBackwardCompatibility(s *spec.Swagger) (*spec.Swag "v1beta1.ClusterRoleList": "k8s.io/kubernetes/pkg/apis/rbac/v1beta1.ClusterRoleList", "v1beta1.ResourceAttributes": "k8s.io/kubernetes/pkg/apis/authorization/v1beta1.ResourceAttributes", "v1.Pod": "k8s.io/kubernetes/pkg/api/v1.Pod", + "v1.ResourceClass": "k8s.io/kubernetes/pkg/api/v1.ResourceClass", + "v1.ResourceClassList": "k8s.io/kubernetes/pkg/api/v1.ResourceClassList", + "v1.ResourceClassSpec": "k8s.io/kubernetes/pkg/api/v1.ResourceClassSpec", + "v1.ResourceClassStatus": "k8s.io/kubernetes/pkg/api/v1.ResourceClassStatus", "v1.FCVolumeSource": "k8s.io/kubernetes/pkg/api/v1.FCVolumeSource", "v1beta1.SubresourceReference": "k8s.io/kubernetes/pkg/apis/extensions/v1beta1.SubresourceReference", "v1.ResourceQuotaStatus": "k8s.io/kubernetes/pkg/api/v1.ResourceQuotaStatus", diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 48f8d9b9fd317..7f2369cfa36dd 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -150,6 +150,7 @@ func (c *kubeletConfiguration) addFlags(fs *pflag.FlagSet) { fs.BoolVar(&c.ExperimentalFailSwapOn, "experimental-fail-swap-on", c.ExperimentalFailSwapOn, "Makes the Kubelet fail to start if swap is enabled on the node. This is a temporary opton to maintain legacy behavior, failing due to swap enabled will happen by default in v1.6.") fs.StringVar(&c.PodManifestPath, "pod-manifest-path", c.PodManifestPath, "Path to to the directory containing pod manifest files to run, or the path to a single pod manifest file. Files starting with dots will be ignored.") + fs.StringVar(&c.DeviceFilePath, "experimental-device-file-path", c.DeviceFilePath, "Path to the directory containing device yaml files to run, or the path to a single device file. Files starting with dots will be ignored.") fs.DurationVar(&c.SyncFrequency.Duration, "sync-frequency", c.SyncFrequency.Duration, "Max period between synchronizing running containers and config") fs.DurationVar(&c.FileCheckFrequency.Duration, "file-check-frequency", c.FileCheckFrequency.Duration, "Duration between checking config files for new data") fs.DurationVar(&c.HTTPCheckFrequency.Duration, "http-check-frequency", c.HTTPCheckFrequency.Duration, "Duration between checking http for new data") diff --git a/examples/resource-class/nvidia.tesla.gpu.yaml b/examples/resource-class/nvidia.tesla.gpu.yaml new file mode 100644 index 0000000000000..8a762adcbba69 --- /dev/null +++ b/examples/resource-class/nvidia.tesla.gpu.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Device +metadata: + name: nvidia-tesla-gpu + labels: + type: nvidia-gpu + quantity: "8" + compute-ability: "3.7" + memory: 10Gi + ecc: "true" + family: tesla + nvlink: "true" + model: k80 + diff --git a/examples/resource-class/rclass.yml b/examples/resource-class/rclass.yml new file mode 100644 index 0000000000000..d4dcb0d4f4408 --- /dev/null +++ b/examples/resource-class/rclass.yml @@ -0,0 +1,14 @@ +--- +kind: ResourceClass +metadata: + name: my.rc2 +spec: + resourceSelector: + - + matchExpressions: + - + key: "Type" + operator: "In" + values: + - "nvidia-gpu" + diff --git a/pkg/api/install/install.go b/pkg/api/install/install.go index 53730940ae820..dafbbd4336884 100644 --- a/pkg/api/install/install.go +++ b/pkg/api/install/install.go @@ -44,6 +44,7 @@ func Install(groupFactoryRegistry announced.APIGroupFactoryRegistry, registry *r "Namespace", "PersistentVolume", "ComponentStatus", + "ResourceClass", ), IgnoredKinds: sets.NewString( "ListOptions", diff --git a/pkg/api/register.go b/pkg/api/register.go index 5f261bdfb5b99..578d97cec6325 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -94,6 +94,9 @@ func addKnownTypes(scheme *runtime.Scheme) error { &List{}, &LimitRange{}, &LimitRangeList{}, + &ResourceClass{}, + &ResourceClassList{}, + &Device{}, &ResourceQuota{}, &ResourceQuotaList{}, &Namespace{}, diff --git a/pkg/api/types.go b/pkg/api/types.go index 673d5c7769cd5..1a3f323306ea1 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1325,6 +1325,43 @@ type EnvVarSource struct { SecretKeyRef *SecretKeySelector } +// +nonNamespaced=true +// +genclient=true + +// ResourceClass +type ResourceClass struct { + metav1.TypeMeta + // +optional + metav1.ObjectMeta + + // Spec defines resources required + // +optional + Spec ResourceClassSpec + // +optional + Status ResourceClassStatus +} + +type ResourceClassStatus struct { + Allocatable int32 + Request int32 +} + +// Spec dictates features and properties of devices targeted by Resource Class +type ResourceClassSpec struct { + // ResourceSelector selects resources. ORed from each selector + ResourceSelector []ResourcePropertySelector + // +optional + SubResourcesCount int32 +} + +// RCList is a list of Rcs +type ResourceClassList struct { + metav1.TypeMeta + // +optional + metav1.ListMeta + Items []ResourceClass +} + // ObjectFieldSelector selects an APIVersioned field of an object. type ObjectFieldSelector struct { // Required: Version of the schema the FieldPath is written in terms of. @@ -2859,6 +2896,44 @@ type NodeSystemInfo struct { Architecture string } +type DeviceSubResources struct { + // Name of the Device + Name string + // Count of devices + Quantity int32 +} + +// Device is a physical or logical device +type Device struct { + metav1.TypeMeta + // +optional + metav1.ObjectMeta + // Count of devices + Quantity int32 + // Device can be a group of several other devices + // +optional + SubResources DeviceSubResources +} + +type ResourceSelectorOperator NodeSelectorOperator + +// A resource selector requirement is a selector that contains values, a key, and an operator +// that relates the key and values +type ResourceSelectorRequirement struct { + // The label key that the selector applies to + Key string + // Example 0.1, intel etc + Values []string + // operator + Operator ResourceSelectorOperator +} + +// A null or empty selector matches no resources +type ResourcePropertySelector struct { + // A list of resource/device selector requirements + MatchExpressions []ResourceSelectorRequirement +} + // NodeStatus is information about the current status of a node. type NodeStatus struct { // Capacity represents the total resources of a node. @@ -2891,6 +2966,12 @@ type NodeStatus struct { // List of volumes that are attached to the node. // +optional VolumesAttached []AttachedVolume + // Total devices that are attached to the node. + // +optional + CapacityDevices []Device + // Allocatable devices that are attached to the node. + // +optional + AllocatableDevices []Device } type UniqueVolumeName string diff --git a/pkg/api/v1/annotation_key_constants.go b/pkg/api/v1/annotation_key_constants.go index 8591f08b4b58e..5edb3d4b393a7 100644 --- a/pkg/api/v1/annotation_key_constants.go +++ b/pkg/api/v1/annotation_key_constants.go @@ -34,6 +34,10 @@ const ( // in the Annotations of a Node. TaintsAnnotationKey string = "scheduler.alpha.kubernetes.io/taints" + // ResClassPodAnnotationKeyPrefix represents the key of a device allocated + // to one container of a pod. + ResClassPodAnnotationKeyPrefix string = "scheduler.alpha.kubernetes.io/resClass" + // SeccompPodAnnotationKey represents the key of a seccomp profile applied // to all containers of a pod. SeccompPodAnnotationKey string = "seccomp.security.alpha.kubernetes.io/pod" diff --git a/pkg/api/v1/helper/helpers.go b/pkg/api/v1/helper/helpers.go index 6b51c5c8364b7..b2db31bed3f2c 100644 --- a/pkg/api/v1/helper/helpers.go +++ b/pkg/api/v1/helper/helpers.go @@ -169,6 +169,40 @@ func containsAccessMode(modes []v1.PersistentVolumeAccessMode, mode v1.Persisten return false } +// NodeSelectorRequirementsAsSelector converts the []NodeSelectorRequirement api type into a struct that implements +// labels.Selector. +func ResourceSelectorRequirementsAsSelector(nsm []v1.ResourceSelectorRequirement) (labels.Selector, error) { + if len(nsm) == 0 { + return labels.Nothing(), nil + } + selector := labels.NewSelector() + for _, expr := range nsm { + var op selection.Operator + switch expr.Operator { + case v1.ResourceSelectorOpIn: + op = selection.In + case v1.ResourceSelectorOpNotIn: + op = selection.NotIn + case v1.ResourceSelectorOpExists: + op = selection.Exists + case v1.ResourceSelectorOpDoesNotExist: + op = selection.DoesNotExist + case v1.ResourceSelectorOpGt: + op = selection.GreaterThan + case v1.ResourceSelectorOpLt: + op = selection.LessThan + default: + return nil, fmt.Errorf("%q is not a valid node selector operator", expr.Operator) + } + r, err := labels.NewRequirement(expr.Key, op, expr.Values) + if err != nil { + return nil, err + } + selector = selector.Add(*r) + } + return selector, nil +} + // NodeSelectorRequirementsAsSelector converts the []NodeSelectorRequirement api type into a struct that implements // labels.Selector. func NodeSelectorRequirementsAsSelector(nsm []v1.NodeSelectorRequirement) (labels.Selector, error) { diff --git a/pkg/api/v1/register.go b/pkg/api/v1/register.go index c70d6ac5615f0..41e64eef54703 100644 --- a/pkg/api/v1/register.go +++ b/pkg/api/v1/register.go @@ -72,6 +72,9 @@ func addKnownTypes(scheme *runtime.Scheme) error { &List{}, &LimitRange{}, &LimitRangeList{}, + &ResourceClass{}, + &ResourceClassList{}, + &Device{}, &ResourceQuota{}, &ResourceQuotaList{}, &Namespace{}, diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 1e5d2bef473b6..4b0b7f489f4f5 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -1421,6 +1421,50 @@ type EnvVarSource struct { SecretKeyRef *SecretKeySelector `json:"secretKeyRef,omitempty" protobuf:"bytes,4,opt,name=secretKeyRef"` } +// +nonNamespaced=true +// +genclient=true + +// ResourceClass is a resource class +type ResourceClass struct { + metav1.TypeMeta `json:",inline"` + // Standard object's metadata. + // +optional + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + // Spec defines resources required + // +optional + Spec ResourceClassSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` + // Most recently observed status of the resource class. + // Populated by the system. + // Read-only + // +optional + Status ResourceClassStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` +} + +// Spec defines resources required +type ResourceClassSpec struct { + // Resource Selector selects resources + ResourceSelector []ResourcePropertySelector `json:"resourceSelector" protobuf:"bytes,1,rep,name=resourceSelector"` + // +optional + SubResourcesCount int32 `json:"subResourcesCount" protobuf:"varint,2,opt,name=subResourcesCount"` +} + +// ResourceClassStatus is information about the current status of a resource class +type ResourceClassStatus struct { + Allocatable int32 `json:"allocatable" protobuf:"varint,1,name=allocatable"` + Request int32 `json:"request" protobuf:"varint,2,name=request"` +} + +// ResourceClassList is list of rcs +type ResourceClassList struct { + metav1.TypeMeta `json:",inline"` + // Standard list metadata. + // +optional + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + // List of nodes + Items []ResourceClass `json:"items" protobuf:"bytes,2,rep,name=items"` +} + // ObjectFieldSelector selects an APIVersioned field of an object. type ObjectFieldSelector struct { // Version of the schema the FieldPath is written in terms of, defaults to "v1". @@ -3279,6 +3323,60 @@ type NodeSystemInfo struct { Architecture string `json:"architecture" protobuf:"bytes,10,opt,name=architecture"` } +type DeviceSubResources struct { + // Name of the Device + Name string `json:"name" protobuf:"varint,1,name=name"` + // Count of devices + Quantity int32 `json:"quantity" protobuf:"varint,2,name=quantity"` +} + +// +nonNamespaced=true + +// ResourceClass is a resource class +// Device is a physical or logical device +type Device struct { + metav1.TypeMeta `json:",inline"` + // Standard object's metadata. + // +optional + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + // count of such devices on node + Quantity int32 `json:"quantity" protobuf:"varint,2,name=quantity"` + // Device can be a group of several other devices + // +optional + SubResources DeviceSubResources `json:"SubResources" protobuf:"bytes,3,opt,name=SubResources,casttype=DeviceSubResources"` +} + +type ResourceSelectorOperator string + +const ( + ResourceSelectorOpIn ResourceSelectorOperator = "In" + ResourceSelectorOpNotIn ResourceSelectorOperator = "NotIn" + ResourceSelectorOpExists ResourceSelectorOperator = "Exists" + ResourceSelectorOpDoesNotExist ResourceSelectorOperator = "DoesNotExist" + ResourceSelectorOpGt ResourceSelectorOperator = "Gt" + ResourceSelectorOpLt ResourceSelectorOperator = "Lt" +) + +// A resource selector requirement is a selector that contains values, a key, and an operator +// that relates the key and values +type ResourceSelectorRequirement struct { + // The label key that the selector applies to + // +patchMergeKey=key + // +patchStrategy=merge + Key string `json:"key" patchStrategy:"merge" patchMergeKey:"key" protobuf:"bytes,1,opt,name=key"` + // Example 0.1, intel etc + // +optional + Values []string `json:"values,omitempty" protobuf:"bytes,2,rep,name=values"` + // operator + Operator ResourceSelectorOperator `json:"operator" protobuf:"bytes,3,opt,name=operator,casttype=ResourceSelectorOperator"` +} + +// A null or empty selector matches no resources +type ResourcePropertySelector struct { + // A list of resource/device selector requirements. ANDed from each ResourceSelectorRequirement + MatchExpressions []ResourceSelectorRequirement `json:"matchExpressions" protobuf:"bytes,1,rep,name=matchExpressions"` +} + // NodeStatus is information about the current status of a node. type NodeStatus struct { // Capacity represents the total resources of a node. @@ -3323,6 +3421,12 @@ type NodeStatus struct { // List of volumes that are attached to the node. // +optional VolumesAttached []AttachedVolume `json:"volumesAttached,omitempty" protobuf:"bytes,10,rep,name=volumesAttached"` + // Total devices that are attached to the node. + // +optional + CapacityDevices []Device `json:"deviceCapacity,omitempty" protobuf:"bytes,11,rep,name=deviceCapacity"` + // Allocatable devices that are attached to the node. + // +optional + AllocatableDevices []Device `json:"deviceAllocatable,omitempty" protobuf:"bytes,12,rep,name=deviceAllocatable"` } type UniqueVolumeName string diff --git a/pkg/api/v1/validation/validation.go b/pkg/api/v1/validation/validation.go index c73f9ab2c865e..90f2d22fce7c2 100644 --- a/pkg/api/v1/validation/validation.go +++ b/pkg/api/v1/validation/validation.go @@ -57,7 +57,8 @@ func ValidateResourceRequirements(requirements *v1.ResourceRequirements, fldPath for resourceName, quantity := range requirements.Requests { fldPath := reqPath.Key(string(resourceName)) // Validate resource name. - allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...) + //TODO(vikasc): commenting resource name validation because need to add handling for resource class name validation. One way, though very restrictive, could be to prefix rClass name with special string like "res-class-". For now switching off validation. + //allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...) // Validate resource quantity. allErrs = append(allErrs, ValidateResourceQuantityValue(string(resourceName), quantity, fldPath)...) } diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index d98a5d9a82975..b5939008d3e3f 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -240,6 +240,7 @@ func maskTrailingDash(name string) string { // Prefix indicates this name will be used as part of generation, in which case // trailing dashes are allowed. var ValidatePodName = NameIsDNSSubdomain +var ValidateResourceClassName = NameIsDNSSubdomain // ValidateReplicationControllerName can be used to check whether the given replication // controller name is valid. @@ -2156,6 +2157,12 @@ func ValidateTolerations(tolerations []api.Toleration, fldPath *field.Path) fiel return allErrors } +func ValidateResourceClass(rClass *api.ResourceClass) field.ErrorList { + fldPath := field.NewPath("metadata") + allErrs := ValidateObjectMeta(&rClass.ObjectMeta, false, ValidateResourceClassName, fldPath) + return allErrs +} + // ValidatePod tests if required fields in the pod are set. func ValidatePod(pod *api.Pod) field.ErrorList { fldPath := field.NewPath("metadata") @@ -3209,6 +3216,13 @@ func ValidateNode(node *api.Node) field.ErrorList { // TODO(rjnagal): Ignore PodCIDR till its completely implemented. return allErrs } +func ValidateResourceClassUpdate(rClass, oldResClass *api.ResourceClass) field.ErrorList { + //TODO(vikasc): Add implementation + var allErrs field.ErrorList + //oldResClass.Status.Allocatable = rClass.Status.Allocatable + //oldResClass.Status.Request = rClass.Status.Request + return allErrs +} // ValidateNodeUpdate tests to make sure a node update can be applied. Modifies oldNode. func ValidateNodeUpdate(node, oldNode *api.Node) field.ErrorList { @@ -3649,7 +3663,7 @@ func ValidateResourceRequirements(requirements *api.ResourceRequirements, fldPat for resourceName, quantity := range requirements.Requests { fldPath := reqPath.Key(string(resourceName)) // Validate resource name. - allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...) + //allErrs = append(allErrs, validateContainerResourceName(string(resourceName), fldPath)...) // Validate resource quantity. allErrs = append(allErrs, ValidateResourceQuantityValue(string(resourceName), quantity, fldPath)...) } diff --git a/pkg/apis/componentconfig/types.go b/pkg/apis/componentconfig/types.go index 4c99e4a42c067..ca189af51ba50 100644 --- a/pkg/apis/componentconfig/types.go +++ b/pkg/apis/componentconfig/types.go @@ -176,6 +176,8 @@ const ( type KubeletConfiguration struct { metav1.TypeMeta + // DeviceFilePath is the path to the directory containing device yaml files + DeviceFilePath string // podManifestPath is the path to the directory containing pod manifests to // run, or the path to a single manifest file PodManifestPath string diff --git a/pkg/apis/componentconfig/v1alpha1/types.go b/pkg/apis/componentconfig/v1alpha1/types.go index 64897246078b7..a5ef984d63da4 100644 --- a/pkg/apis/componentconfig/v1alpha1/types.go +++ b/pkg/apis/componentconfig/v1alpha1/types.go @@ -250,6 +250,8 @@ type LeaderElectionConfiguration struct { type KubeletConfiguration struct { metav1.TypeMeta `json:",inline"` + // DeviceFilePath is the path to the directory containing device yaml files + DeviceFilePath string // podManifestPath is the path to the directory containing pod manifests to // run, or the path to a single manifest file PodManifestPath string `json:"podManifestPath"` diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 60b9d3d9acd15..591beab68a948 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -409,6 +409,12 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)} + resClassIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + if kubeDeps.KubeClient != nil { + resClassLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "resourceclasses", metav1.NamespaceAll, fields.Everything()) + cache.NewReflector(resClassLW, &v1.ResourceClass{}, resClassIndexer, 0).Run() + } + resClassLister := &predicates.CachedResourceClassInfo{ResourceClassLister: corelisters.NewResourceClassLister(resClassIndexer)} // TODO: get the real node object of ourself, // and use the real node name and UID. // TODO: what is namespace for node? @@ -452,6 +458,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub clusterDNS: clusterDNS, serviceLister: serviceLister, nodeInfo: nodeInfo, + resClassInfo: resClassLister, masterServiceNamespace: kubeCfg.MasterServiceNamespace, streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, recorder: kubeDeps.Recorder, @@ -485,6 +492,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit), iptablesDropBit: int(kubeCfg.IPTablesDropBit), experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate), + experimentalDeviceFilePath: kubeCfg.DeviceFilePath, } secretManager := secret.NewCachingSecretManager( @@ -795,7 +803,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.AddPodSyncHandler(activeDeadlineHandler) criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) - klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler)) + klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, klet.listResClassesAnyWay, criticalPodAdmissionHandler)) // apply functional Option's for _, opt := range kubeDeps.Options { opt(klet) @@ -883,7 +891,8 @@ type Kubelet struct { // serviceLister knows how to list services serviceLister serviceLister // nodeInfo knows how to get information about the node for this kubelet. - nodeInfo predicates.NodeInfo + nodeInfo predicates.NodeInfo + resClassInfo predicates.ResClassInfo // a list of node labels to register nodeLabels map[string]string @@ -1103,7 +1112,8 @@ type Kubelet struct { // dockerLegacyService contains some legacy methods for backward compatibility. // It should be set only when docker is using non json-file logging driver. - dockerLegacyService dockershim.DockerLegacyService + dockerLegacyService dockershim.DockerLegacyService + experimentalDeviceFilePath string } func initializeServerCertificateManager(kubeClient clientset.Interface, kubeCfg *componentconfig.KubeletConfiguration, nodeName types.NodeName, ips []net.IP, hostnames []string) (certificate.Manager, error) { diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index e7c3591d8099e..6e4d51a5c77df 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -209,6 +209,14 @@ func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) { return kl.initialNode() } +func (kl *Kubelet) listResClassesAnyWay() ([]*v1.ResourceClass, error) { + fmt.Printf("\n %s Entered \n", file_line()) + if rcList, err := kl.resClassInfo.ListResClasses(); err == nil { + return rcList, nil + } + return nil, nil +} + // GetNodeConfig returns the container manager node config. func (kl *Kubelet) GetNodeConfig() cm.NodeConfig { return kl.containerManager.GetNodeConfig() diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index eb8712209312e..41c5efb220617 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -504,6 +504,22 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error { } func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { + // Note(VikasC): This is a hack to populate device for ResourceClass PoC + if kl.experimentalDeviceFilePath != "" { + devicesP, err := nodeutil.ReadDeviceFiles(kl.experimentalDeviceFilePath) + if err != nil { + glog.Errorf("Error while reading device files: %v", err) + } + if len(devicesP) != 0 { + var devicesV []v1.Device + for _, d := range devicesP { + devicesV = append(devicesV, *d) + } + node.Status.CapacityDevices = devicesV + node.Status.AllocatableDevices = devicesV + } + } + // Note: avoid blindly overwriting the capacity in case opaque // resources are being advertised. if node.Status.Capacity == nil { diff --git a/pkg/kubelet/lifecycle/predicate.go b/pkg/kubelet/lifecycle/predicate.go index 82e255caa1227..487b504564b83 100644 --- a/pkg/kubelet/lifecycle/predicate.go +++ b/pkg/kubelet/lifecycle/predicate.go @@ -28,6 +28,7 @@ import ( ) type getNodeAnyWayFuncType func() (*v1.Node, error) +type listResClassesAnyWayFuncType func() ([]*v1.ResourceClass, error) // AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod. // This allows for the graceful handling of pod admission failure. @@ -36,15 +37,17 @@ type AdmissionFailureHandler interface { } type predicateAdmitHandler struct { - getNodeAnyWayFunc getNodeAnyWayFuncType - admissionFailureHandler AdmissionFailureHandler + getNodeAnyWayFunc getNodeAnyWayFuncType + listResClassesAnyWayFunc listResClassesAnyWayFuncType + admissionFailureHandler AdmissionFailureHandler } var _ PodAdmitHandler = &predicateAdmitHandler{} -func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler) *predicateAdmitHandler { +func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, listResClassesAnyWayFunc listResClassesAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler) *predicateAdmitHandler { return &predicateAdmitHandler{ getNodeAnyWayFunc, + listResClassesAnyWayFunc, admissionFailureHandler, } } @@ -59,10 +62,43 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult Message: "Kubelet cannot get node info.", } } + rClasses, err := w.listResClassesAnyWayFunc() + if err != nil { + glog.Errorf("Cannot list Res Classes: %v", err) + return PodAdmitResult{ + Admit: false, + Reason: "InvalidNodeInfo", + Message: "Kubelet cannot list resource class info.", + } + } pod := attrs.Pod pods := attrs.OtherPods - nodeInfo := schedulercache.NewNodeInfo(pods...) - nodeInfo.SetNode(node) + nodeInfo := schedulercache.NewNodeInfo() + for _, rClass := range rClasses { + _, err := nodeInfo.AddResourceClass(rClass, node) + if err != nil { + message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + return PodAdmitResult{ + Admit: false, + Reason: "UnexpectedAdmissionError", + Message: message, + } + } + } + for _, pod := range pods { + _, _, err := nodeInfo.OnAddUpdateResClassToDeviceMappingForPod(pod) + if err != nil { + message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + return PodAdmitResult{ + Admit: false, + Reason: "UnexpectedAdmissionError", + Message: message, + } + } + } + nodeInfo.SetNode(node, pods...) fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo) if err != nil { message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err) diff --git a/pkg/registry/core/resourceclass/BUILD b/pkg/registry/core/resourceclass/BUILD new file mode 100644 index 0000000000000..9a64b8b6ae182 --- /dev/null +++ b/pkg/registry/core/resourceclass/BUILD @@ -0,0 +1,68 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "registry.go", + "strategy.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/validation:go_default_library", + "//pkg/kubelet/client:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["strategy_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/testing:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/registry/core/resourceclass/rest:all-srcs", + "//pkg/registry/core/resourceclass/storage:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/registry/core/resourceclass/doc.go b/pkg/registry/core/resourceclass/doc.go new file mode 100644 index 0000000000000..e7506b2bd7615 --- /dev/null +++ b/pkg/registry/core/resourceclass/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package resourceclass provides Registry interface and implementation for storing ResouceClasses. +package resourceclass // import "k8s.io/kubernetes/pkg/registry/core/resourceclass" diff --git a/pkg/registry/core/resourceclass/registry.go b/pkg/registry/core/resourceclass/registry.go new file mode 100644 index 0000000000000..365de5959a7c5 --- /dev/null +++ b/pkg/registry/core/resourceclass/registry.go @@ -0,0 +1,83 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceclass + +import ( + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/kubernetes/pkg/api" +) + +// Registry is an interface for things that know how to store node. +type Registry interface { + ListResourceClasss(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (*api.ResourceClassList, error) + CreateResourceClass(ctx genericapirequest.Context, node *api.ResourceClass) error + UpdateResourceClass(ctx genericapirequest.Context, node *api.ResourceClass) error + GetResourceClass(ctx genericapirequest.Context, nodeID string, options *metav1.GetOptions) (*api.ResourceClass, error) + DeleteResourceClass(ctx genericapirequest.Context, nodeID string) error + WatchResourceClasss(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) +} + +// storage puts strong typing around storage calls +type storage struct { + rest.StandardStorage +} + +// NewRegistry returns a new Registry interface for the given Storage. Any mismatched +// types will panic. +func NewRegistry(s rest.StandardStorage) Registry { + return &storage{s} +} + +func (s *storage) ListResourceClasss(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (*api.ResourceClassList, error) { + obj, err := s.List(ctx, options) + if err != nil { + return nil, err + } + + return obj.(*api.ResourceClassList), nil +} + +func (s *storage) CreateResourceClass(ctx genericapirequest.Context, resClass *api.ResourceClass) error { + _, err := s.Create(ctx, resClass, false) + return err +} + +func (s *storage) UpdateResourceClass(ctx genericapirequest.Context, node *api.ResourceClass) error { + _, _, err := s.Update(ctx, node.Name, rest.DefaultUpdatedObjectInfo(node, api.Scheme)) + return err +} + +func (s *storage) WatchResourceClasss(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + return s.Watch(ctx, options) +} + +func (s *storage) GetResourceClass(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (*api.ResourceClass, error) { + obj, err := s.Get(ctx, name, options) + if err != nil { + return nil, err + } + return obj.(*api.ResourceClass), nil +} + +func (s *storage) DeleteResourceClass(ctx genericapirequest.Context, name string) error { + _, _, err := s.Delete(ctx, name, nil) + return err +} diff --git a/pkg/registry/core/resourceclass/storage/BUILD b/pkg/registry/core/resourceclass/storage/BUILD new file mode 100644 index 0000000000000..c55b220daa334 --- /dev/null +++ b/pkg/registry/core/resourceclass/storage/BUILD @@ -0,0 +1,61 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["storage_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/kubelet/client:go_default_library", + "//pkg/registry/registrytest:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = ["storage.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/kubelet/client:go_default_library", + "//pkg/registry/cachesize:go_default_library", + "//pkg/registry/core/node:go_default_library", + "//pkg/registry/core/node/rest:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/registry/core/resourceclass/storage/storage.go b/pkg/registry/core/resourceclass/storage/storage.go new file mode 100644 index 0000000000000..edcfef2b3770d --- /dev/null +++ b/pkg/registry/core/resourceclass/storage/storage.go @@ -0,0 +1,101 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + //"fmt" + //"net/http" + //"net/url" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/generic" + genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/kubernetes/pkg/api" + //"k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/registry/cachesize" + "k8s.io/kubernetes/pkg/registry/core/resourceclass" +) + +// ResourceClassStorage includes storage for resource class and all sub resources +//type ResourceClassStorage struct { +// Node *REST +// Status *StatusREST +//} + +type REST struct { + *genericregistry.Store + //connection client.ConnectionInfoGetter + //proxyTransport http.RoundTripper +} + +// StatusREST implements the REST endpoint for changing the status of resource class. +type StatusREST struct { + store *genericregistry.Store +} + +func (r *StatusREST) New() runtime.Object { + return &api.ResourceClass{} +} + +// Get retrieves the object from the storage. It is required to support Patch. +func (r *StatusREST) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { + return r.store.Get(ctx, name, options) +} + +// Update alters the status subset of an object. +func (r *StatusREST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) { + return r.store.Update(ctx, name, objInfo) +} + +// NewStorage returns a ResourceClassStorage object that will work against nodes. +//func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) { +func NewStorage(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST) { + store := &genericregistry.Store{ + Copier: api.Scheme, + NewFunc: func() runtime.Object { return &api.ResourceClass{} }, + NewListFunc: func() runtime.Object { return &api.ResourceClassList{} }, + //PredicateFunc: node.MatchNode, + PredicateFunc: resourceclass.MatchResourceClass, + QualifiedResource: api.Resource("resourceclasses"), + WatchCacheSize: cachesize.GetWatchCacheSizeByResource("resourceclasses"), + + CreateStrategy: resourceclass.Strategy, + UpdateStrategy: resourceclass.Strategy, + DeleteStrategy: resourceclass.Strategy, + ExportStrategy: resourceclass.Strategy, + } + options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: resourceclass.GetAttrs} + if err := store.CompleteWithOptions(options); err != nil { + panic(err) + } + + statusStore := *store + statusStore.UpdateStrategy = resourceclass.StatusStrategy + + return &REST{Store: store}, &StatusREST{store: &statusStore} +} + +// Implement ShortNamesProvider +var _ rest.ShortNamesProvider = &REST{} + +// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource. +func (r *REST) ShortNames() []string { + return []string{"resourceclass"} +} diff --git a/pkg/registry/core/resourceclass/storage/storage_test.go b/pkg/registry/core/resourceclass/storage/storage_test.go new file mode 100644 index 0000000000000..f5c9290778bf3 --- /dev/null +++ b/pkg/registry/core/resourceclass/storage/storage_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "testing" + + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/registry/generic" + etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/registry/registrytest" +) + +func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { + etcdStorage, server := registrytest.NewEtcdStorage(t, "") + restOptions := generic.RESTOptions{ + StorageConfig: etcdStorage, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: 1, + ResourcePrefix: "resourceclasses", + } + storage, err := NewStorage(restOptions, nil) + if err != nil { + t.Fatal(err) + } + return storage.ResourceClass, server +} + +func validNewResourceClass() *api.Node { + return &api.ResourceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Labels: map[string]string{ + "name": "foo", + }, + }, + Spec: api.ResourceClassSpec{ + ExternalID: "external", + }, + Status: api.ResourceClassStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("0"), + }, + }, + } +} + +func TestCreate(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + test := registrytest.New(t, storage.Store).ClusterScope() + resourceclass := validNewResourceClass() + resourceclass.ObjectMeta = metav1.ObjectMeta{GenerateName: "foo"} + test.TestCreate( + // valid + resourceclass, + // invalid + &api.ResourceClass{ + ObjectMeta: metav1.ObjectMeta{Name: "_-a123-a_"}, + }, + ) +} + +func TestUpdate(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + test := registrytest.New(t, storage.Store).ClusterScope() + test.TestUpdate( + // valid + validNewResourceClass(), + // updateFunc + func(obj runtime.Object) runtime.Object { + object := obj.(*api.ResourceClass) + object.Spec.Unschedulable = !object.Spec.Unschedulable + return object + }, + ) +} + +func TestDelete(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + test := registrytest.New(t, storage.Store).ClusterScope() + test.TestDelete(validNewResourceClass()) +} + +func TestGet(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + test := registrytest.New(t, storage.Store).ClusterScope() + test.TestGet(validNewResourceClass()) +} + +func TestList(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + test := registrytest.New(t, storage.Store).ClusterScope() + test.TestList(validNewResourceClass()) +} + +func TestWatch(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + test := registrytest.New(t, storage.Store).ClusterScope() + test.TestWatch( + validNewResourceClass(), + // matching labels + []labels.Set{ + {"name": "foo"}, + }, + // not matching labels + []labels.Set{ + {"name": "bar"}, + {"foo": "bar"}, + }, + // matching fields + []fields.Set{ + {"metadata.name": "foo"}, + }, + // not matchin fields + []fields.Set{ + {"metadata.name": "bar"}, + }, + ) +} + +func TestShortNames(t *testing.T) { + storage, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + expected := []string{"no"} + registrytest.AssertShortNames(t, storage, expected) +} diff --git a/pkg/registry/core/resourceclass/strategy.go b/pkg/registry/core/resourceclass/strategy.go new file mode 100644 index 0000000000000..e18e9e155ec8d --- /dev/null +++ b/pkg/registry/core/resourceclass/strategy.go @@ -0,0 +1,153 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceclass + +import ( + "fmt" + + //"k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + //"k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation/field" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/generic" + pkgstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/names" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/validation" +) + +// resourceclassStrategy implements behavior for resourceclasses +type resourceclassStrategy struct { + runtime.ObjectTyper + names.NameGenerator +} + +// Strategy is the default logic that applies when creating and updating ResourceClass +// objects. +var Strategy = resourceclassStrategy{api.Scheme, names.SimpleNameGenerator} + +// NamespaceScoped is false for resourceclasses. +func (resourceclassStrategy) NamespaceScoped() bool { + return false +} + +// AllowCreateOnUpdate is false for resourceclasses. +func (resourceclassStrategy) AllowCreateOnUpdate() bool { + return false +} + +// PrepareForCreate clears fields that are not allowed to be set by end users on creation. +func (resourceclassStrategy) PrepareForCreate(ctx genericapirequest.Context, obj runtime.Object) { + _ = obj.(*api.ResourceClass) + // ResourceClasss allow *all* fields, including status, to be set on create. +} + +// PrepareForUpdate clears fields that are not allowed to be set by end users on update. +func (resourceclassStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old runtime.Object) { + newResourceClass := obj.(*api.ResourceClass) + oldResourceClass := old.(*api.ResourceClass) + newResourceClass.Status = oldResourceClass.Status +} + +// Validate validates a new resourceclass. +func (resourceclassStrategy) Validate(ctx genericapirequest.Context, obj runtime.Object) field.ErrorList { + resourceclass := obj.(*api.ResourceClass) + return validation.ValidateResourceClass(resourceclass) +} + +// Canonicalize normalizes the object after validation. +func (resourceclassStrategy) Canonicalize(obj runtime.Object) { +} + +// ValidateUpdate is the default update validation for an end user. +func (resourceclassStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old runtime.Object) field.ErrorList { + errorList := validation.ValidateResourceClass(obj.(*api.ResourceClass)) + return append(errorList, validation.ValidateResourceClassUpdate(obj.(*api.ResourceClass), old.(*api.ResourceClass))...) +} + +func (ns resourceclassStrategy) Export(ctx genericapirequest.Context, obj runtime.Object, exact bool) error { + _, ok := obj.(*api.ResourceClass) + if !ok { + // unexpected programmer error + return fmt.Errorf("unexpected object: %v", obj) + } + ns.PrepareForCreate(ctx, obj) + if exact { + return nil + } + //ResourceClasses are the resources that allow direct status edits, therefore + // we clear that without exact so that the resourceclass value can be reused. + return nil +} + +type resourceclassStatusStrategy struct { + resourceclassStrategy +} + +var StatusStrategy = resourceclassStatusStrategy{Strategy} + +func (resourceclassStatusStrategy) PrepareForCreate(ctx genericapirequest.Context, obj runtime.Object) { + _ = obj.(*api.ResourceClass) + // ResourceClasss allow *all* fields, including status, to be set on create. +} + +func (resourceclassStatusStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old runtime.Object) { + newResourceClass := obj.(*api.ResourceClass) + oldResourceClass := old.(*api.ResourceClass) + newResourceClass.Spec = oldResourceClass.Spec +} + +// Canonicalize normalizes the object after validation. +func (resourceclassStatusStrategy) Canonicalize(obj runtime.Object) { +} + +func (resourceclassStrategy) AllowUnconditionalUpdate() bool { + return true +} + +// ResourceGetter is an interface for retrieving resources by ResourceLocation. +type ResourceGetter interface { + Get(genericapirequest.Context, string, *metav1.GetOptions) (runtime.Object, error) +} + +// ResourceClassToSelectableFields returns a field set that represents the object. +func ResourceClassToSelectableFields(resourceclass *api.ResourceClass) fields.Set { + return generic.ObjectMetaFieldsSet(&resourceclass.ObjectMeta, false) +} + +// GetAttrs returns labels and fields of a given object for filtering purposes. +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + resourceclassObj, ok := obj.(*api.ResourceClass) + if !ok { + return nil, nil, false, fmt.Errorf("not a resourceclass") + } + return labels.Set(resourceclassObj.ObjectMeta.Labels), ResourceClassToSelectableFields(resourceclassObj), resourceclassObj.Initializers != nil, nil +} + +// MatchResourceClass returns a generic matcher for a given label and field selector. +func MatchResourceClass(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate { + return pkgstorage.SelectionPredicate{ + Label: label, + Field: field, + GetAttrs: GetAttrs, + IndexFields: []string{"metadata.name"}, + } +} diff --git a/pkg/registry/core/resourceclass/strategy_test.go b/pkg/registry/core/resourceclass/strategy_test.go new file mode 100644 index 0000000000000..2cba99d0ae291 --- /dev/null +++ b/pkg/registry/core/resourceclass/strategy_test.go @@ -0,0 +1,56 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceclass + +import ( + "testing" + + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/kubernetes/pkg/api" + apitesting "k8s.io/kubernetes/pkg/api/testing" +) + +func TestMatchResourceClass(t *testing.T) { + testFieldMap := map[bool][]fields.Set{ + true: { + {"metadata.name": "foo"}, + }, + false: { + {"foo": "bar"}, + }, + } + + for expectedResult, fieldSet := range testFieldMap { + for _, field := range fieldSet { + m := MatchResourceClass(labels.Everything(), field.AsSelector()) + _, matchesSingle := m.MatchesSingle() + if e, a := expectedResult, matchesSingle; e != a { + t.Errorf("%+v: expected %v, got %v", fieldSet, e, a) + } + } + } +} + +func TestSelectableFieldLabelConversions(t *testing.T) { + apitesting.TestSelectableFieldLabelConversionsOfKind(t, + api.Registry.GroupOrDie(api.GroupName).GroupVersion.String(), + "ResourceClass", + ResourceClassToSelectableFields(&api.Node{}), + nil, + ) +} diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index e46a7cf333044..9e7e0b676918b 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -53,6 +53,7 @@ import ( podtemplatestore "k8s.io/kubernetes/pkg/registry/core/podtemplate/storage" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage" + resourceclassstore "k8s.io/kubernetes/pkg/registry/core/resourceclass/storage" resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage" secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage" "k8s.io/kubernetes/pkg/registry/core/service" @@ -118,6 +119,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi eventStorage := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) limitRangeStorage := limitrangestore.NewREST(restOptionsGetter) + resourceClassStorage, resourceClassStatusStorage := resourceclassstore.NewStorage(restOptionsGetter) resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotastore.NewREST(restOptionsGetter) secretStorage := secretstore.NewREST(restOptionsGetter) serviceAccountStorage := serviceaccountstore.NewREST(restOptionsGetter) @@ -208,6 +210,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi "events": eventStorage, "limitRanges": limitRangeStorage, + "resourceClasses": resourceClassStorage, + "resourceClasses/status": resourceClassStatusStorage, "resourceQuotas": resourceQuotaStorage, "resourceQuotas/status": resourceQuotaStatusStorage, "namespaces": namespaceStorage, diff --git a/pkg/util/node/node.go b/pkg/util/node/node.go index a351667bf28bc..46d96aef3ce36 100644 --- a/pkg/util/node/node.go +++ b/pkg/util/node/node.go @@ -19,15 +19,20 @@ package node import ( "encoding/json" "fmt" + "io/ioutil" "net" "os" + "path/filepath" + "sort" "strings" "time" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + utilyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -176,3 +181,126 @@ func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1 } return updatedNode, nil } + +func ReadDeviceFiles(path string) ([]*v1.Device, error) { + statInfo, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + } + if statInfo.Mode().IsDir() { + devices, err := ExtractDevicesFromDir(path) + if err != nil { + return devices, err + } + return devices, err + } else { + return nil, fmt.Errorf("path is not a directory") + } +} + +func ExtractDevicesFromDir(name string) ([]*v1.Device, error) { + glog.V(1).Infof("ExtractDevicesFromDir: Entered for dir %v", name) + dirents, err := filepath.Glob(filepath.Join(name, "[^.]*")) + glog.V(1).Infof("ExtractDevicesFromDir: dirents %v", dirents) + if err != nil { + return nil, fmt.Errorf("glob failed: %v", err) + } + + devices := make([]*v1.Device, 0) + if len(dirents) == 0 { + return devices, nil + } + + sort.Strings(dirents) + for _, path := range dirents { + statInfo, err := os.Stat(path) + if err != nil { + glog.V(1).Infof("Can't get metadata for %q: %v", path, err) + continue + } + + switch { + case statInfo.Mode().IsDir(): + glog.V(1).Infof("Not recursing into config path %q", path) + case statInfo.Mode().IsRegular(): + device, err := ExtractDeviceFromFile(path) + if err != nil { + glog.V(1).Infof("Can't process config file %q: %v", path, err) + } else { + devices = append(devices, device) + } + default: + glog.V(1).Infof("Config path %q is not a directory or file: %v", path, statInfo.Mode()) + } + } + return devices, nil +} + +func ExtractDeviceFromFile(filename string) (device *v1.Device, err error) { + glog.V(3).Infof("Reading config file %v", filename) + + file, err := os.Open(filename) + if err != nil { + return nil, err + } + defer file.Close() + + data, err := ioutil.ReadAll(file) + if err != nil { + return device, err + } + + //defaultFn := func(device *api.Device) error { + // return s.applyDefaults(device, filename) + //} + glog.V(3).Infof("Going to decode device data %v", data) + + parsed, device, deviceErr := TryDecodeSingleDevice(data) + if parsed { + if deviceErr != nil { + return device, deviceErr + } + return device, nil + } + + return device, fmt.Errorf("%v: read '%v', but couldn't parse as device(%v).\n", + filename, string(data), deviceErr) +} + +func TryDecodeSingleDevice(data []byte) (parsed bool, device *v1.Device, err error) { + // JSON is valid YAML, so this should work for everything. + glog.V(3).Infof("TryDecodeSingleDevice Entered data:%v", data) + + json, err := utilyaml.ToJSON(data) + if err != nil { + return false, nil, err + } + obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), json) + if err != nil { + return false, device, err + } + glog.V(3).Infof("TryDecodeSingleDevice: getting api Device") + // Check whether the object could be converted to single Device. + if _, ok := obj.(*api.Device); !ok { + err = fmt.Errorf("invalid device: %#v", obj) + return false, device, err + } + newDevice := obj.(*api.Device) + // Apply default values and validate the device. + //if err = defaultFn(newDevice); err != nil { + // return true, device, err + //} + //if errs := validation.ValidateDevice(newDevice); len(errs) > 0 { + // err = fmt.Errorf("invalid device: %v", errs) + // return true, device, err + //} + glog.V(3).Infof("TryDecodeSingleDevice: getting v1 api Device") + v1Device := &v1.Device{} + if err := v1.Convert_api_Device_To_v1_Device(newDevice, v1Device, nil); err != nil { + return true, nil, err + } + glog.V(3).Infof("TryDecodeSingleDevice: v1 api Device:%v", *v1Device) + return true, v1Device, nil +} diff --git a/plugin/cmd/kube-scheduler/app/configurator.go b/plugin/cmd/kube-scheduler/app/configurator.go index a40dbbb79765f..46bb8977e6d97 100644 --- a/plugin/cmd/kube-scheduler/app/configurator.go +++ b/plugin/cmd/kube-scheduler/app/configurator.go @@ -84,6 +84,7 @@ func CreateScheduler( replicaSetInformer extensionsinformers.ReplicaSetInformer, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, + resClassInformer coreinformers.ResourceClassInformer, recorder record.EventRecorder, ) (*scheduler.Scheduler, error) { configurator := factory.NewConfigFactory( @@ -97,6 +98,7 @@ func CreateScheduler( replicaSetInformer, statefulSetInformer, serviceInformer, + resClassInformer, s.HardPodAffinitySymmetricWeight, ) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 653e3398f491b..1259cfa3d92ca 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -87,6 +87,7 @@ func Run(s *options.SchedulerServer) error { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Core().V1().ResourceClasses(), recorder, ) if err != nil { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 1697ad680d962..7a42795912fa7 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -106,6 +106,7 @@ func NodeRules() []rbac.PolicyRule { // TODO: restrict to pods scheduled on the bound node once field selectors are supported by list/watch authorization rbac.NewRule(Read...).Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbac.NewRule(Read...).Groups(legacyGroup).Resources("resourceclasses").RuleOrDie(), // Needed for the node to create/delete mirror pods. // Use the NodeRestriction admission plugin to limit a node to creating/deleting mirror pods bound to itself. @@ -327,12 +328,15 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule(Read...).Groups(legacyGroup).Resources("nodes", "pods").RuleOrDie(), rbac.NewRule("create").Groups(legacyGroup).Resources("pods/binding", "bindings").RuleOrDie(), rbac.NewRule("update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), + rbac.NewRule("patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), // things that select pods rbac.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(), rbac.NewRule(Read...).Groups(extensionsGroup).Resources("replicasets").RuleOrDie(), rbac.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(), // things that pods use rbac.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(), + rbac.NewRule("get", "update", "patch", "delete", "list", "watch").Groups(legacyGroup).Resources("resourceclasses").RuleOrDie(), + rbac.NewRule("update", "patch").Groups(legacyGroup).Resources("resourceclasses/status").RuleOrDie(), }, }, { diff --git a/plugin/pkg/scheduler/algorithm/predicates/metadata.go b/plugin/pkg/scheduler/algorithm/predicates/metadata.go index 8858ba730f56f..64c4229e73b0f 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/metadata.go +++ b/plugin/pkg/scheduler/algorithm/predicates/metadata.go @@ -45,10 +45,11 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf if err != nil { return nil } + resReq, _ := GetResourceRequest(pod) predicateMetadata := &predicateMetadata{ pod: pod, podBestEffort: isPodBestEffort(pod), - podRequest: GetResourceRequest(pod), + podRequest: resReq, podPorts: schedutil.GetUsedPorts(pod), matchingAntiAffinityTerms: matchingTerms, } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 95939bf504351..f8f0a814616c0 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -62,6 +62,10 @@ type NodeInfo interface { GetNodeInfo(nodeID string) (*v1.Node, error) } +type ResClassInfo interface { + ListResClasses() ([]*v1.ResourceClass, error) +} + type PersistentVolumeInfo interface { GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) } @@ -108,6 +112,22 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) { return node, nil } +type CachedResourceClassInfo struct { + corelisters.ResourceClassLister +} + +func (c *CachedResourceClassInfo) ListResClasses() ([]*v1.ResourceClass, error) { + fmt.Printf("\n%s c %+v \n", file_line(), c) + fmt.Printf("\n%s c.List %p \n", file_line(), c.List) + allResClasses, err := c.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("error retrieving rcList from cache: %v", err) + } + fmt.Printf("\n%s resclasses %+v\n", file_line(), allResClasses) + + return allResClasses, err +} + // Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file // due to the way declarations are processed in predicate declaration unit tests. type matchingPodAntiAffinityTerm struct { @@ -494,13 +514,16 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *s // C1: // CPU: 2 // Memory: 1G +// nvidia-gpu: 2 +// solarflare-40gig: 1 // C2: // CPU: 1 // Memory: 1G // -// Result: CPU: 3, Memory: 3G -func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { +// Result: CPU: 3, Memory: 3G, ['nvidia-gpu': 2, 'solarflare-40gig': 1] +func GetResourceRequest(pod *v1.Pod) (*schedulercache.Resource, *map[string]int32) { result := schedulercache.Resource{} + rClasses := make(map[string]int32) for _, container := range pod.Spec.Containers { for rName, rQuantity := range container.Resources.Requests { switch rName { @@ -515,6 +538,8 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { default: if v1helper.IsOpaqueIntResourceName(rName) { result.AddOpaque(rName, rQuantity.Value()) + } else { + rClasses[string(rName)] = int32(rQuantity.Value()) } } } @@ -553,12 +578,18 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { value := rQuantity.Value() if value > result.OpaqueIntResources[rName] { result.SetOpaque(rName, value) + } else { + if quantity, ok := rClasses[string(rName)]; ok { + if rQuantity.Value() > int64(quantity) { + rClasses[string(rName)] = int32(rQuantity.Value()) + } + } } } } } } - return &result + return &result, &rClasses } func podName(pod *v1.Pod) string { @@ -578,17 +609,40 @@ func PodFitsResources(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.No } var podRequest *schedulercache.Resource + var rClasses *(map[string]int32) if predicateMeta, ok := meta.(*predicateMetadata); ok { podRequest = predicateMeta.podRequest + _, rClasses = GetResourceRequest(pod) } else { // We couldn't parse metadata - fallback to computing it. - podRequest = GetResourceRequest(pod) + podRequest, rClasses = GetResourceRequest(pod) } + + //resClassRequestMap := GetResourceClassRequest(pod, nodeInfo) if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 && len(podRequest.OpaqueIntResources) == 0 { return len(predicateFails) == 0, predicateFails, nil } allocatable := nodeInfo.AllocatableResource() + if len(*rClasses) > 0 { + for name, quantity := range *rClasses { + resClasses := nodeInfo.ResClasses() + fmt.Printf("\n%s rc_name %s, quantity %d n.rc %+v resClasses %p \n", file_line(), name, quantity, resClasses, resClasses) + fmt.Printf("\nresClasses len %v\n", len(resClasses)) + fmt.Printf("\nresClasses[0] %v\n", resClasses[name]) + rci, ok := resClasses[name] + if !ok { + // TODO: pass correct predicate fail. Something like "NoDeviceForRequestedResourceClass" + predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceName(name), int64(quantity), 0, 0)) + break + } else { + if rci.Requested+quantity > rci.Allocatable { + predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceName(name), int64(quantity), int64(rci.Requested), int64(rci.Allocatable))) + break + } + } + } + } if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU { predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU)) } diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index e1f515e0b18e6..b2c17686a72fa 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -87,6 +87,7 @@ type genericScheduler struct { // If it fails, it will return a Fiterror error with reasons. func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) { trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name)) + defer trace.LogIfLong(100 * time.Millisecond) nodes, err := nodeLister.List() @@ -177,6 +178,8 @@ func findNodesThatFit( } else { // Create filtered list with enough space to avoid growing it // and allow assigning. + //fmt.Println(file_line()) + //fmt.Printf("\npredicateFuncs %+v\n", predicateFuncs) filtered = make([]*v1.Node, len(nodes)) errs := errors.MessageCountMap{} var predicateResultLock sync.Mutex diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 171b9f19e1f1f..1459f7f8417a8 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -76,6 +76,8 @@ type ConfigFactory struct { serviceLister corelisters.ServiceLister // a means to list all controllers controllerLister corelisters.ReplicationControllerLister + // a means to list all resourceclasses + resClassLister corelisters.ResourceClassLister // a means to list all replicasets replicaSetLister extensionslisters.ReplicaSetLister // a means to list all statefulsets @@ -114,10 +116,15 @@ func NewConfigFactory( replicaSetInformer extensionsinformers.ReplicaSetInformer, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, + resClassInformer coreinformers.ResourceClassInformer, hardPodAffinitySymmetricWeight int, ) scheduler.Configurator { stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) + err := schedulerCache.AddClient(client) + if err != nil { + glog.Errorf("Error on client addition: %v", err) + } c := &ConfigFactory{ client: client, @@ -129,6 +136,7 @@ func NewConfigFactory( controllerLister: replicationControllerInformer.Lister(), replicaSetLister: replicaSetInformer.Lister(), statefulSetLister: statefulSetInformer.Lister(), + resClassLister: resClassInformer.Lister(), schedulerCache: schedulerCache, StopEverything: stopEverything, schedulerName: schedulerName, @@ -203,6 +211,16 @@ func NewConfigFactory( // TODO(harryz) need to fill all the handlers here and below for equivalence cache + resClassInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addResourceClassToCache, + //TODO: Implement + //UpdateFunc: c.updateResourceClassInCache, + //DeleteFunc: c.deleteResourceClassFromCache, + }, + 0, + ) + return c } @@ -280,6 +298,18 @@ func (c *ConfigFactory) deletePodFromCache(obj interface{}) { } } +func (c *ConfigFactory) addResourceClassToCache(obj interface{}) { + rClass, ok := obj.(*v1.ResourceClass) + if !ok { + glog.Errorf("cannot convert to *v1.ResourceClass: %v", obj) + return + } + + if err := c.schedulerCache.AddResourceClass(rClass); err != nil { + glog.Errorf("scheduler cache AddResourceClass failed: %v", err) + } +} + func (c *ConfigFactory) addNodeToCache(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 3b3a69f8a38a0..c9244a0e65239 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -17,6 +17,7 @@ limitations under the License. package schedulercache import ( + "errors" "fmt" "sync" "time" @@ -25,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) var ( @@ -42,9 +44,10 @@ func New(ttl time.Duration, stop <-chan struct{}) Cache { } type schedulerCache struct { - stop <-chan struct{} - ttl time.Duration - period time.Duration + kubeClient clientset.Interface + stop <-chan struct{} + ttl time.Duration + period time.Duration // This mutex guards all fields within this cache struct. mu sync.Mutex @@ -52,8 +55,9 @@ type schedulerCache struct { // The key could further be used to get an entry in podStates. assumedPods map[string]bool // a map from pod key to podState. - podStates map[string]*podState - nodes map[string]*NodeInfo + podStates map[string]*podState + nodes map[string]*NodeInfo + resourceClasses map[string]*ResourceClassInfo } type podState struct { @@ -66,13 +70,13 @@ type podState struct { func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { return &schedulerCache{ - ttl: ttl, - period: period, - stop: stop, - - nodes: make(map[string]*NodeInfo), - assumedPods: make(map[string]bool), - podStates: make(map[string]*podState), + ttl: ttl, + period: period, + stop: stop, + nodes: make(map[string]*NodeInfo), + assumedPods: make(map[string]bool), + podStates: make(map[string]*podState), + resourceClasses: make(map[string]*ResourceClassInfo), } } @@ -186,8 +190,29 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) { if !ok { n = NewNodeInfo() cache.nodes[pod.Spec.NodeName] = n + n.kubeClient = cache.kubeClient } n.addPod(pod) + cache.onAddPodHandleResClasses(pod) +} + +func (cache *schedulerCache) onAddPodHandleResClasses(pod *v1.Pod) { + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + return + } + deviceMappings, ResourceClassRequestMappings, _ := n.OnAddUpdateResClassToDeviceMappingForPod(pod) + for _, mapping := range deviceMappings { + annotKey := v1.ResClassPodAnnotationKeyPrefix + "_" + mapping.rClassName + "_" + mapping.deviceName + n.patchPodWithDeviceAnnotation(pod, annotKey, mapping.deviceQuantity) + } + if ResourceClassRequestMappings != nil { + for rClassName, request := range *ResourceClassRequestMappings { + cacheRCInfo := cache.resourceClasses[rClassName] + cacheRCInfo.Requested += request + n.patchResourceClassStatus(cacheRCInfo.resClass, cacheRCInfo.Allocatable, cacheRCInfo.Requested) + } + } } // Assumes that lock is already acquired. @@ -205,12 +230,30 @@ func (cache *schedulerCache) removePod(pod *v1.Pod) error { if err := n.removePod(pod); err != nil { return err } + err := cache.onRemovePodHandleResClasses(pod) + if err != nil { + return err + } + if len(n.pods) == 0 && n.node == nil { delete(cache.nodes, pod.Spec.NodeName) } return nil } +func (cache *schedulerCache) onRemovePodHandleResClasses(pod *v1.Pod) error { + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + return errors.New(fmt.Sprintf("Node %v not found in cache", pod.Spec.NodeName)) + } + ResourceClassRequestMappings, err := n.OnRemoveUpdateResClassToDeviceMappingForPod(pod) + for rClassName, request := range *ResourceClassRequestMappings { + cacheRCInfo := cache.resourceClasses[rClassName] + cacheRCInfo.Requested -= request + n.patchResourceClassStatus(cacheRCInfo.resClass, cacheRCInfo.Allocatable, cacheRCInfo.Requested) + } + return err +} func (cache *schedulerCache) AddPod(pod *v1.Pod) error { key, err := getPodKey(pod) if err != nil { @@ -301,14 +344,74 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { return nil } +func (cache *schedulerCache) AddResourceClass(rClass *v1.ResourceClass) error { + cache.mu.Lock() + defer cache.mu.Unlock() + var err error + //fmt.Println(file_line()) + fmt.Printf("%s schedulerCache: %p, NumNodes=> %d\n", file_line(), cache, len(cache.nodes)) + _, ok := cache.resourceClasses[rClass.Name] + if !ok { + rc := &ResourceClassInfo{} + cache.resourceClasses[rClass.Name] = rc + rc.resClass = rClass + for _, info := range cache.nodes { + fmt.Printf("\n%s cache.nodes %+v\n", file_line(), cache.nodes) + rcPerNodeInfo, err := info.AddResourceClass(rClass, info.node) + if err != nil { + break + } else if rcPerNodeInfo == nil { + continue + } else { + rc.Allocatable += rcPerNodeInfo.Allocatable + rc.Requested += rcPerNodeInfo.Requested + } + info.patchResourceClassStatus(rc.resClass, rc.Allocatable, rc.Requested) + } + + } + return err +} + +func (cache *schedulerCache) AddClient(client clientset.Interface) error { + cache.mu.Lock() + defer cache.mu.Unlock() + cache.kubeClient = client + return nil +} + func (cache *schedulerCache) AddNode(node *v1.Node) error { cache.mu.Lock() defer cache.mu.Unlock() + fmt.Printf("\n%s cache.nodes %+v\n", file_line(), cache.nodes) n, ok := cache.nodes[node.Name] if !ok { n = NewNodeInfo() cache.nodes[node.Name] = n + n.kubeClient = cache.kubeClient + //fmt.Printf("\n%s cache.rClasses %+v\n", file_line(), cache.resourceClasses) + //for _, rc := range cache.resourceClasses { + // rcPerNodeInfo, err := n.AddResourceClass(rc.resClass, node) + // if err != nil { + // break + /// } else { + // rc.Allocatable += rcPerNodeInfo.Allocatable + /// rc.Requested += rcPerNodeInfo.Requested + // n.patchResourceClassStatus(rc.resClass, rc.Allocatable, rc.Requested) + // } + //} + } + fmt.Printf("\n%s cache.rClasses %+v\n", file_line(), cache.resourceClasses) + for _, rc := range cache.resourceClasses { + rcPerNodeInfo, err := n.AddResourceClass(rc.resClass, node) + if err != nil { + break + } else { + rc.Allocatable += rcPerNodeInfo.Allocatable + rc.Requested += rcPerNodeInfo.Requested + n.patchResourceClassStatus(rc.resClass, rc.Allocatable, rc.Requested) + } } return n.SetNode(node) } @@ -321,6 +424,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { if !ok { n = NewNodeInfo() cache.nodes[newNode.Name] = n + n.kubeClient = cache.kubeClient } return n.SetNode(newNode) } diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index 521fecfd79bdf..4007d683dd1ac 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -19,6 +19,7 @@ package schedulercache import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) // Cache collects pods' information and provides node-level aggregated information. @@ -80,6 +81,9 @@ type Cache interface { // AddNode adds overall information about node. AddNode(node *v1.Node) error + // AddResourceClass adds overall information about resource class. + AddResourceClass(rClass *v1.ResourceClass) error + // UpdateNode updates overall information about node. UpdateNode(oldNode, newNode *v1.Node) error @@ -93,4 +97,5 @@ type Cache interface { // List lists all cached pods (including assumed ones). List(labels.Selector) ([]*v1.Pod, error) + AddClient(client clientset.Interface) error } diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 4717b1d8e8c1c..11776bfc90262 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -17,24 +17,56 @@ limitations under the License. package schedulercache import ( + "errors" "fmt" + "math" + "strconv" + "strings" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/strategicpatch" clientcache "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" ) var emptyResource = Resource{} +type DeviceInfo struct { + requested int32 + allocatable int32 + groupResourceName string + remainderFromGroup int32 + subResName string + subResQuantity int32 + targetResourceClasses map[string]*ResourceClassInfo +} + +type ResourceClassInfo struct { + resClass *v1.ResourceClass + Requested int32 + Allocatable int32 + subResCount int32 +} + +type ResourceClassDeviceAllocation struct { + rClassName string + deviceName string + deviceQuantity int32 +} + // NodeInfo is node level aggregated information. type NodeInfo struct { // Overall node information. - node *v1.Node - + node *v1.Node + kubeClient clientset.Interface pods []*v1.Pod podsWithAffinity []*v1.Pod usedPorts map[int]bool @@ -47,6 +79,10 @@ type NodeInfo struct { // We store allocatedResources (which is Node.Status.Allocatable.*) explicitly // as int64, to avoid conversions and accessing map. allocatableResource *Resource + + devices map[string]*DeviceInfo + resClasses map[string]*ResourceClassInfo + // We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value()) // explicitly as int, to avoid conversions and improve performance. allowedPodNumber int @@ -116,6 +152,108 @@ func (r *Resource) SetOpaque(name v1.ResourceName, quantity int64) { r.OpaqueIntResources[name] = quantity } +// Sets the overall resourceclass information. +func (n *NodeInfo) AddResourceClass(rClass *v1.ResourceClass, node *v1.Node) (*ResourceClassInfo, error) { + fmt.Println(file_line(), "Entry ") + rcSpec := rClass.Spec + var err error + fmt.Printf("\n%s node.Status %v \n", file_line(), node) + for _, device := range node.Status.AllocatableDevices { + //fmt.Printf( "device=> %+v \n", device) + fmt.Println(file_line(), "devicesCount ", len(node.Status.AllocatableDevices)) + _, ok := n.devices[device.Name] + if !ok { + var subResourceQuantity int32 + var subResourceName string + resClasses := make(map[string]*ResourceClassInfo) + // check if this device is a higher level device which is formed by grouping other devices based on some property, + // for example, nvlink or nodelocality. + if device.SubResources.Quantity != 0 { + subResourceQuantity = device.SubResources.Quantity + subResourceName = device.SubResources.Name + // Update group member devices with back reference to this device + if dInfo, ok := n.devices[subResourceName]; ok { + dInfo.groupResourceName = device.Name + } else { + //TODO: return proper error message rather panic + // API validation should avoid this to happen + panic(fmt.Sprintf("ANOMALY DETECTED!!! sub resource, %v, for the higher device, %v, not found in cache.", subResourceName, device.Name)) + } + + } + + n.devices[device.Name] = &DeviceInfo{ + allocatable: device.Quantity, + subResName: subResourceName, + subResQuantity: subResourceQuantity, + targetResourceClasses: resClasses, + } + } + //fmt.Printf("\n%s n %p d %p \n", file_line(), n, n.devices[device.Name]) + + if rcSpec.SubResourcesCount > 0 { + if device.SubResources.Quantity < rcSpec.SubResourcesCount { + fmt.Println(file_line()) + // Devices in the subgroup are less than devices asked in the group by RC + // Example, RC asked for 4 gpus with node locality, but device has 2 only, so + // skip this device. + continue + } + } + if rcMatchesResourcePropertySelectors(device, rcSpec.ResourceSelector) { + // Since device matches required properties/attribute mentioned in resourceclass, create a RCNodeInfo (item in list RC2Nodes) + rcInfo := &ResourceClassInfo{} + rcInfo.resClass = rClass + rcInfo.Allocatable = device.Quantity + rcInfo.subResCount = rcSpec.SubResourcesCount + + n.resClasses[rClass.Name] = rcInfo + + fmt.Printf("\n %s about to update device2Resourceclass mapping, node %p\n", file_line(), n) + // Now update device2RC map for this Node + err = n.updateDeviceName2ResourceClassesMap(device.Name, rcInfo) + if err != nil { + glog.Errorf("ERROR: updateDeviceName2ResourceClassesMap, %v", err) + return nil, err + } + // for k, v := range n.devices { + // fmt.Printf("\n %v: %+v\n ", k, v) + // } + return rcInfo, err + } // go to next device + } + return nil, err +} + +func (n *NodeInfo) updateDeviceName2ResourceClassesMap(deviceName string, rcInfo *ResourceClassInfo) error { + if info, ok := n.devices[deviceName]; ok { + info.targetResourceClasses[rcInfo.resClass.Name] = rcInfo + fmt.Printf("\n %s dinfo %p, dinfo.TaresClasses %+v \n", file_line(), info, info.targetResourceClasses) + } else { + panic(fmt.Sprintf("ANOMALY DETECTED!!! device %v, not found in scheduler cache.", deviceName)) + } + return nil +} + +func rcMatchesResourcePropertySelectors(device v1.Device, resSelectors []v1.ResourcePropertySelector) bool { + for _, req := range resSelectors { + fmt.Println(file_line(), req) + //TODO: Instead of using NodeSelector helper api, write and use resource class specific one. + resSelector, err := v1helper.ResourceSelectorRequirementsAsSelector(req.MatchExpressions) + fmt.Println(file_line(), resSelector) + if err != nil { + glog.Warningf("Failed to parse MatchExpressions: %+v, regarding as not match: %v", req.MatchExpressions, err) + return false + } + fmt.Println(file_line(), device.Labels) + if resSelector.Matches(labels.Set(device.Labels)) { + return true + } + fmt.Println(file_line(), "Returning False") + } + return false +} + // NewNodeInfo returns a ready to use empty NodeInfo object. // If any pods are given in arguments, their information will be aggregated in // the returned object. @@ -127,6 +265,8 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { allowedPodNumber: 0, generation: 0, usedPorts: make(map[int]bool), + devices: make(map[string]*DeviceInfo), + resClasses: make(map[string]*ResourceClassInfo), } for _, pod := range pods { ni.addPod(pod) @@ -142,6 +282,14 @@ func (n *NodeInfo) Node() *v1.Node { return n.node } +// ResClasses return all resource classes info on this node. +func (n *NodeInfo) ResClasses() map[string]*ResourceClassInfo { + if n == nil { + return nil + } + return n.resClasses +} + // Pods return all pods scheduled (including assumed to be) on this node. func (n *NodeInfo) Pods() []*v1.Pod { if n == nil { @@ -217,6 +365,14 @@ func (n *NodeInfo) AllocatableResource() Resource { return *n.allocatableResource } +// AllocatableDevice returns allocatable devices on a given node. +func (n *NodeInfo) AllocatableDevices() map[string]*DeviceInfo { + if n == nil { + return make(map[string]*DeviceInfo) + } + return n.devices +} + func (n *NodeInfo) Clone() *NodeInfo { clone := &NodeInfo{ node: n.node, @@ -228,6 +384,8 @@ func (n *NodeInfo) Clone() *NodeInfo { memoryPressureCondition: n.memoryPressureCondition, diskPressureCondition: n.diskPressureCondition, usedPorts: make(map[int]bool), + devices: make(map[string]*DeviceInfo), + resClasses: make(map[string]*ResourceClassInfo), generation: n.generation, } if len(n.pods) > 0 { @@ -238,6 +396,17 @@ func (n *NodeInfo) Clone() *NodeInfo { clone.usedPorts[k] = v } } + fmt.Printf("%s cloning device and node info\n", file_line()) + if len(n.devices) > 0 { + for k, v := range n.devices { + clone.devices[k] = v + } + } + if len(n.resClasses) > 0 { + for k, v := range n.resClasses { + clone.resClasses[k] = v + } + } if len(n.podsWithAffinity) > 0 { clone.podsWithAffinity = append([]*v1.Pod(nil), n.podsWithAffinity...) } @@ -261,9 +430,223 @@ func hasPodAffinityConstraints(pod *v1.Pod) bool { return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) } +func (n *NodeInfo) onAddUpdateDependentResClasses(dName string, quantityReq int32) (*map[string]int32, error) { + fmt.Printf("\n %s Entered \n", file_line()) + device := n.devices[dName] + associatedClasses := make(map[string]int32) + var associatedDeviceInfo *DeviceInfo + var normalizedReq int32 + for _, rcInfo := range device.targetResourceClasses { + rcInfo.Requested += quantityReq + associatedClasses[rcInfo.resClass.Name] = quantityReq + fmt.Printf("\n%s class %v, request %v \n", file_line(), rcInfo.resClass.Name, quantityReq) + } + if (device.subResName != "") && (device.subResQuantity != 0) { + associatedDeviceInfo = n.devices[device.subResName] + normalizedReq = (quantityReq * device.subResQuantity) + associatedDeviceInfo.requested += normalizedReq + for _, rcInfo := range associatedDeviceInfo.targetResourceClasses { + rcInfo.Requested += normalizedReq + associatedClasses[rcInfo.resClass.Name] = normalizedReq + fmt.Printf("\n%s class %v, request %v \n", file_line(), rcInfo.resClass.Name, normalizedReq) + } + } else { + if associatedDeviceInfo, ok := n.devices[device.groupResourceName]; ok { + if associatedDeviceInfo.remainderFromGroup > quantityReq { + associatedDeviceInfo.remainderFromGroup -= quantityReq + } else { + quantityReq -= associatedDeviceInfo.remainderFromGroup + remainder := math.Mod(float64(quantityReq), float64(associatedDeviceInfo.subResQuantity)) + normalizedReq = quantityReq / associatedDeviceInfo.subResQuantity + if remainder != 0 { + normalizedReq += 1 + associatedDeviceInfo.remainderFromGroup = int32(remainder) + } + associatedDeviceInfo.requested += normalizedReq + for _, rcInfo := range associatedDeviceInfo.targetResourceClasses { + rcInfo.Requested += normalizedReq + associatedClasses[rcInfo.resClass.Name] = normalizedReq + fmt.Printf("\n%s class %v, request %v \n", file_line(), rcInfo.resClass.Name, normalizedReq) + } + } + + } + } + return &associatedClasses, nil +} + +func (n *NodeInfo) onRemoveUpdateDependentResClasses(dName string, quantityReq int32) (*map[string]int32, error) { + device := n.devices[dName] + associatedClasses := make(map[string]int32) + var associatedDeviceInfo *DeviceInfo + var normalizedReq int32 + for _, rcInfo := range device.targetResourceClasses { + rcInfo.Requested -= quantityReq + associatedClasses[rcInfo.resClass.Name] = quantityReq + fmt.Printf("\n%s class %v, request %v \n", file_line(), rcInfo.resClass.Name, quantityReq) + } + + // Update sub-devices and their dependent classes + if (device.subResName != "") && (device.subResQuantity != 0) { + associatedDeviceInfo = n.devices[device.subResName] + normalizedReq = (quantityReq * device.subResQuantity) + associatedDeviceInfo.requested -= normalizedReq + for _, rcInfo := range associatedDeviceInfo.targetResourceClasses { + rcInfo.Requested -= normalizedReq + associatedClasses[rcInfo.resClass.Name] = normalizedReq + fmt.Printf("\n%s class %v, request %v \n", file_line(), normalizedReq) + } + } else { // update device to which, this device is a subdevice + if associatedDeviceInfo, ok := n.devices[device.groupResourceName]; ok { + if (associatedDeviceInfo.remainderFromGroup + quantityReq) < associatedDeviceInfo.subResQuantity { + associatedDeviceInfo.remainderFromGroup += quantityReq + } else { + quantityReq += associatedDeviceInfo.remainderFromGroup + remainder := math.Mod(float64(quantityReq), float64(associatedDeviceInfo.subResQuantity)) + normalizedReq = quantityReq / associatedDeviceInfo.subResQuantity + if remainder != 0 { + associatedDeviceInfo.remainderFromGroup = int32(remainder) + } + associatedDeviceInfo.requested -= normalizedReq + for _, rcInfo := range associatedDeviceInfo.targetResourceClasses { + rcInfo.Requested -= normalizedReq + associatedClasses[rcInfo.resClass.Name] = normalizedReq + fmt.Printf("\n%s class %v, request %v \n", file_line(), normalizedReq) + } + } + + } + } + return &associatedClasses, nil +} + +func (n *NodeInfo) patchResourceClassStatus(rClass *v1.ResourceClass, allocatable int32, requested int32) error { + fmt.Printf("\n %s Entered class %v alloc %d req %d\n", file_line(), rClass.Name, allocatable, requested) + oldData, err := json.Marshal(rClass) + if err != nil { + return err + } + rClass.Status.Allocatable = allocatable + rClass.Status.Request = requested + newData, err := json.Marshal(rClass) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.ResourceClass{}) + if err != nil { + return err + } + updatedclass, err := n.kubeClient.Core().ResourceClasses().Patch(rClass.Name, types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + fmt.Printf("\n %s ERROR: rc status patch , %v \n", file_line(), err) + glog.V(10).Infof("Failed to patch status for %s: %v", rClass.Name, err) + return err + } + fmt.Printf("\n %s rc status patching done succesfullyi, updated %+v \n", file_line, updatedclass) + glog.V(10).Infof("Patched status for res class %s with +%v", rClass.Name, requested) + return nil +} + +func setIntAnnotation(pod *v1.Pod, annotationKey string, value int) { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[annotationKey] = strconv.Itoa(value) +} + +func (n *NodeInfo) patchPodWithDeviceAnnotation(pod *v1.Pod, annotationKey string, value int32) error { + fmt.Printf("\n %s Entered \n", file_line()) + oldData, err := json.Marshal(pod) + if err != nil { + return err + } + setIntAnnotation(pod, annotationKey, int(value)) + newData, err := json.Marshal(pod) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) + if err != nil { + return err + } + _, err = n.kubeClient.Core().Pods(pod.Namespace).Patch(pod.Name, types.StrategicMergePatchType, patchBytes) + if err != nil { + glog.V(10).Infof("Failed to add device annotation for pod %s: %v", pod.Name, err) + return err + } + glog.V(10).Infof("Added device annotation %s for pod %s to %v", annotationKey, pod.Name, value) + return nil +} + +func (n *NodeInfo) OnRemoveUpdateResClassToDeviceMappingForPod(pod *v1.Pod) (*map[string]int32, error) { + _, rClasses, _, _ := calculateResource(pod) + allDependentClasses := make(map[string]int32) + if len(rClasses) > 0 { + devices, _ := getDeviceNameQuantityFromPodAnnotations(pod) + for dName, quantity := range *devices { + dInfo, ok := n.devices[dName] + if ok { + dInfo.requested -= quantity + dependentClasses, err := n.onRemoveUpdateDependentResClasses(dName, quantity) + if err != nil { + return nil, err + } + for k, v := range *dependentClasses { + allDependentClasses[k] = v + } + } + } + } + return &allDependentClasses, nil +} + +func (n *NodeInfo) OnAddUpdateResClassToDeviceMappingForPod(pod *v1.Pod) ([]*ResourceClassDeviceAllocation, *map[string]int32, error) { + fmt.Printf("\n %s Entered node %p\n", file_line(), n) + allMappings := make([]*ResourceClassDeviceAllocation, 0) + allDependentClasses := make(map[string]int32) + _, rClasses, _, _ := calculateResource(pod) + if len(rClasses) > 0 { + for name, quantity := range rClasses { + deviceFound := false + mapping := &ResourceClassDeviceAllocation{} + for dName, dInfo := range n.devices { + fmt.Printf("\n %s dinfo %p(%+v), TaResCl %+v \n", file_line(), dInfo, *dInfo, dInfo.targetResourceClasses) + if _, ok := dInfo.targetResourceClasses[name]; ok { + //1. pick an appropriate device from the devices on this node + //2. after selecting device, update all other resource classes that might get impacted by this device's consumption. + dependentClasses, err := n.onAddUpdateDependentResClasses(dName, quantity) + if err == nil { + for k, v := range *dependentClasses { + allDependentClasses[k] = v + } + + mapping.rClassName = name + mapping.deviceName = dName + mapping.deviceQuantity = quantity + allMappings = append(allMappings, mapping) + fmt.Printf("\n%s dinfo %v quantity %v\n", file_line(), *dInfo, quantity) + dInfo.requested += quantity + } else { + glog.Errorf("Error in syncing res classes, %v", err) + } + deviceFound = true + } + } + if !deviceFound { + glog.Errorf("ResourceClass info not found in cache. May be cache has not fully initialized yet.") + return nil, nil, errors.New("ResourceClass info not found in cache. May be cache has not fully initialized yet.") + } + // go to next resource class request + continue + } + } + return allMappings, &allDependentClasses, nil +} + // addPod adds pod information to this NodeInfo. func (n *NodeInfo) addPod(pod *v1.Pod) { - res, non0_cpu, non0_mem := calculateResource(pod) + fmt.Printf("\n %s Entered node %p\n", file_line(), n) + res, _, non0_cpu, non0_mem := calculateResource(pod) n.requestedResource.MilliCPU += res.MilliCPU n.requestedResource.Memory += res.Memory n.requestedResource.NvidiaGPU += res.NvidiaGPU @@ -288,6 +671,36 @@ func (n *NodeInfo) addPod(pod *v1.Pod) { n.generation++ } +func annotationKeyHasResClassPrefix(key string) bool { + return strings.HasPrefix(key, v1.ResClassPodAnnotationKeyPrefix) +} + +func extractDeviceNameFromAnnotationKey(key string) string { + rclassDevice := strings.SplitAfter(key, v1.ResClassPodAnnotationKeyPrefix)[1] + splitted := strings.SplitAfter(rclassDevice, "_") + return splitted[len(splitted)-1] +} + +func getDeviceNameQuantityFromPodAnnotations(pod *v1.Pod) (*map[string]int32, bool) { + devices := make(map[string]int32) + if pod.Annotations == nil { + return &devices, false + } + for k, v := range pod.Annotations { + if annotationKeyHasResClassPrefix(k) { + deviceName := extractDeviceNameFromAnnotationKey(k) + intValue, err := strconv.Atoi(v) + if err != nil { + glog.Warningf("Cannot convert the value %q with annotation key %q for the pod %q", + v, k, pod.Name) + return &devices, false + } + devices[deviceName] = int32(intValue) + } + } + return &devices, true +} + // removePod subtracts pod information to this NodeInfo. func (n *NodeInfo) removePod(pod *v1.Pod) error { k1, err := getPodKey(pod) @@ -319,7 +732,7 @@ func (n *NodeInfo) removePod(pod *v1.Pod) error { n.pods[i] = n.pods[len(n.pods)-1] n.pods = n.pods[:len(n.pods)-1] // reduce the resource data - res, non0_cpu, non0_mem := calculateResource(pod) + res, _, non0_cpu, non0_mem := calculateResource(pod) n.requestedResource.MilliCPU -= res.MilliCPU n.requestedResource.Memory -= res.Memory @@ -344,7 +757,8 @@ func (n *NodeInfo) removePod(pod *v1.Pod) error { return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name) } -func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int64) { +func calculateResource(pod *v1.Pod) (res Resource, rClasses map[string]int32, non0_cpu int64, non0_mem int64) { + rClasses = make(map[string]int32) for _, c := range pod.Spec.Containers { for rName, rQuant := range c.Resources.Requests { switch rName { @@ -359,6 +773,8 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6 default: if v1helper.IsOpaqueIntResourceName(rName) { res.AddOpaque(rName, rQuant.Value()) + } else { + rClasses[string(rName)] = int32(rQuant.Value()) } } } @@ -395,7 +811,10 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) { } // Sets the overall node information. -func (n *NodeInfo) SetNode(node *v1.Node) error { +func (n *NodeInfo) SetNode(node *v1.Node, pods ...*v1.Pod) error { + for _, pod := range pods { + n.addPod(pod) + } n.node = node for rName, rQuant := range node.Status.Allocatable { switch rName { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 2dcea903a0a58..1d315980ae3e9 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -469,7 +469,6 @@ func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest. if err != nil { return nil, false, err } - var ( creatingObj runtime.Object creating = false