Skip to content

Commit

Permalink
Add support for affinity matching
Browse files Browse the repository at this point in the history
  • Loading branch information
ahg-g committed Feb 25, 2022
1 parent ee4aa69 commit 8045f64
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 1 deletion.
22 changes: 21 additions & 1 deletion pkg/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
Expand Down Expand Up @@ -77,6 +78,7 @@ type Capacity struct {
RequestableResources map[corev1.ResourceName][]FlavorInfo
UsedResources Resources
Workloads map[string]*workload.Info
LabelKeys map[corev1.ResourceName]sets.String
}

// FlavorInfo holds processed flavor type.
Expand All @@ -85,6 +87,7 @@ type FlavorInfo struct {
Guaranteed int64
Ceiling int64
Taints []corev1.Taint
Labels map[string]string
}

func NewCapacity(cap *kueue.Capacity) *Capacity {
Expand All @@ -93,19 +96,28 @@ func NewCapacity(cap *kueue.Capacity) *Capacity {
RequestableResources: resourcesByName(cap.Spec.RequestableResources),
UsedResources: make(Resources, len(cap.Spec.RequestableResources)),
Workloads: map[string]*workload.Info{},
LabelKeys: map[corev1.ResourceName]sets.String{},
}

for _, r := range cap.Spec.RequestableResources {
if len(r.Flavors) == 0 {
continue
}

labelKeys := sets.NewString()
ts := make(map[string]int64, len(r.Flavors))
for _, t := range r.Flavors {
for k := range t.Labels {
labelKeys.Insert(k)
}
ts[t.Name] = 0
}
if len(labelKeys) != 0 {
c.LabelKeys[r.Name] = labelKeys
}
c.UsedResources[r.Name] = ts
}

return c
}

Expand Down Expand Up @@ -351,12 +363,20 @@ func resourcesByName(in []kueue.Resource) map[corev1.ResourceName][]FlavorInfo {
flavors := make([]FlavorInfo, len(r.Flavors))
for i := range flavors {
f := &r.Flavors[i]
flavors[i] = FlavorInfo{
fInfo := FlavorInfo{
Name: f.Name,
Guaranteed: workload.ResourceValue(r.Name, f.Quota.Guaranteed),
Ceiling: workload.ResourceValue(r.Name, f.Quota.Ceiling),
Taints: append([]corev1.Taint(nil), f.Taints...),
}
if len(f.Labels) != 0 {
fInfo.Labels = map[string]string{}
for k, v := range f.Labels {
fInfo.Labels[k] = v
}
}
flavors[i] = fInfo

}
out[r.Name] = flavors
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -241,12 +243,19 @@ func (s *Scheduler) assign(ctx context.Context, e *entry) error {
// If it finds a flavor, also returns any borrowing required.
func findFlavorForResource(name corev1.ResourceName, val int64, wUsed map[string]int64, cap *capacity.Capacity, spec *corev1.PodSpec) (string, int64) {
for _, flavor := range cap.RequestableResources[name] {
// We will only check against the flavors labels.
selector := flavorSelector(spec, cap.LabelKeys[name])

_, untolerated := corev1helpers.FindMatchingUntoleratedTaint(flavor.Taints, spec.Tolerations, func(t *corev1.Taint) bool {
return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute
})
if untolerated {
continue
}
if match, _ := selector.Match(&corev1.Node{ObjectMeta: metav1.ObjectMeta{Labels: flavor.Labels}}); !match {
continue
}

// Consider the usage assigned to previous pod sets.
ok, borrow := fitsFlavorLimits(name, val+wUsed[flavor.Name], cap, &flavor)
if ok {
Expand All @@ -256,6 +265,47 @@ func findFlavorForResource(name corev1.ResourceName, val int64, wUsed map[string
return "", 0
}

func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.String) nodeaffinity.RequiredNodeAffinity {
var specCopy corev1.PodSpec

// Remove affinity constraints with irrelevant keys.
if len(spec.NodeSelector) != 0 {
specCopy.NodeSelector = map[string]string{}
for k, v := range spec.NodeSelector {
if allowedKeys.Has(k) {
specCopy.NodeSelector[k] = v
}
}
}

affinity := spec.Affinity
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
var termsCopy []corev1.NodeSelectorTerm
for _, t := range affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms {
var expCopy []corev1.NodeSelectorRequirement
for _, e := range t.MatchExpressions {
if allowedKeys.Has(e.Key) {
expCopy = append(expCopy, e)
}
}
if len(expCopy) != 0 {
termsCopy = append(termsCopy, corev1.NodeSelectorTerm{MatchExpressions: expCopy})
}
}
if len(termsCopy) != 0 {
specCopy.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: termsCopy,
},
},
}
}

}
return nodeaffinity.GetRequiredNodeAffinity(&corev1.Pod{Spec: specCopy})
}

// fitsFlavorLimits returns whether a requested resource fits in a specific flavor's quota limits.
// If it fits, also returns any borrowing required.
func fitsFlavorLimits(name corev1.ResourceName, val int64, cap *capacity.Capacity, flavor *capacity.FlavorInfo) (bool, int64) {
Expand Down
114 changes: 114 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,120 @@ func TestEntryAssignFlavors(t *testing.T) {
},
},
},
"multiple flavors, fits a node selector": {
wlPods: []kueue.PodSet{
{
Count: 1,
Name: "main",
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1")},
},
},
},
// ignored:foo should get ignored
NodeSelector: map[string]string{"cpuType": "two", "ignored1": "foo"},
Affinity: &corev1.Affinity{NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
// this expression should get ignored
Key: "ignored2",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"bar"},
},
},
},
},
}},
},
},
},
},
capacity: capacity.Capacity{
RequestableResources: map[corev1.ResourceName][]capacity.FlavorInfo{
corev1.ResourceCPU: noBorrowing([]capacity.FlavorInfo{
{Name: "one", Guaranteed: 4000, Labels: map[string]string{"cpuType": "one"}},
{Name: "two", Guaranteed: 4000, Labels: map[string]string{"cpuType": "two"}},
}),
},
LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")},
},
wantFits: true,
wantFlavors: map[string]map[corev1.ResourceName]string{
"main": {
corev1.ResourceCPU: "two",
},
},
},
"multiple flavors, fits with node affinity": {
wlPods: []kueue.PodSet{
{
Count: 1,
Name: "main",
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Mi"),
},
},
},
},
NodeSelector: map[string]string{"ignored1": "foo"},
Affinity: &corev1.Affinity{NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "cpuType",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"two"},
},
{
Key: "memType",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"two"},
},
},
},
},
}},
},
},
},
},
capacity: capacity.Capacity{
RequestableResources: map[corev1.ResourceName][]capacity.FlavorInfo{
corev1.ResourceCPU: noBorrowing([]capacity.FlavorInfo{
{Name: "one", Guaranteed: 4000, Labels: map[string]string{"cpuType": "one", "group": "group1"}},
{Name: "two", Guaranteed: 4000, Labels: map[string]string{"cpuType": "two"}},
}),
corev1.ResourceMemory: noBorrowing([]capacity.FlavorInfo{
{Name: "one", Guaranteed: utiltesting.Gi, Labels: map[string]string{"memType": "one"}},
{Name: "two", Guaranteed: utiltesting.Gi, Labels: map[string]string{"memType": "two"}},
}),
},
LabelKeys: map[corev1.ResourceName]sets.String{
corev1.ResourceCPU: sets.NewString("cpuType", "group"),
corev1.ResourceMemory: sets.NewString("memType"),
},
},
wantFits: true,
wantFlavors: map[string]map[corev1.ResourceName]string{
"main": {
corev1.ResourceCPU: "two",
corev1.ResourceMemory: "two",
},
},
},
"multiple specs, fit different flavors": {
wlPods: []kueue.PodSet{
{
Expand Down

0 comments on commit 8045f64

Please sign in to comment.