Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for affinity matching #73

Merged
merged 1 commit into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion pkg/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
Expand Down Expand Up @@ -77,6 +78,10 @@ type Capacity struct {
RequestableResources map[corev1.ResourceName][]FlavorInfo
UsedResources Resources
Workloads map[string]*workload.Info
// The set of key labels from all flavors of a resource.
// Those keys define the affinity terms of a workload
// that can be matched against the flavors.
LabelKeys map[corev1.ResourceName]sets.String
}

// FlavorInfo holds processed flavor type.
Expand All @@ -85,6 +90,7 @@ type FlavorInfo struct {
Guaranteed int64
Ceiling int64
Taints []corev1.Taint
Labels map[string]string
}

func NewCapacity(cap *kueue.Capacity) *Capacity {
Expand All @@ -95,17 +101,29 @@ func NewCapacity(cap *kueue.Capacity) *Capacity {
Workloads: map[string]*workload.Info{},
}

labelKeys := map[corev1.ResourceName]sets.String{}
for _, r := range cap.Spec.RequestableResources {
if len(r.Flavors) == 0 {
continue
}

resKeys := sets.NewString()
ts := make(map[string]int64, len(r.Flavors))
for _, t := range r.Flavors {
for k := range t.Labels {
resKeys.Insert(k)
}
ts[t.Name] = 0
}
if len(resKeys) != 0 {
labelKeys[r.Name] = resKeys
}
c.UsedResources[r.Name] = ts
}

if len(labelKeys) != 0 {
c.LabelKeys = labelKeys
}
return c
}

Expand Down Expand Up @@ -351,12 +369,20 @@ func resourcesByName(in []kueue.Resource) map[corev1.ResourceName][]FlavorInfo {
flavors := make([]FlavorInfo, len(r.Flavors))
for i := range flavors {
f := &r.Flavors[i]
flavors[i] = FlavorInfo{
fInfo := FlavorInfo{
Name: f.Name,
Guaranteed: workload.ResourceValue(r.Name, f.Quota.Guaranteed),
Ceiling: workload.ResourceValue(r.Name, f.Quota.Ceiling),
Taints: append([]corev1.Taint(nil), f.Taints...),
}
if len(f.Labels) != 0 {
fInfo.Labels = make(map[string]string, len(f.Labels))
for k, v := range f.Labels {
fInfo.Labels[k] = v
}
}
flavors[i] = fInfo

}
out[r.Name] = flavors
}
Expand Down
1 change: 1 addition & 0 deletions pkg/capacity/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (c *Capacity) snapshot() *Capacity {
RequestableResources: c.RequestableResources, // Shallow copy is enough.
UsedResources: make(Resources, len(c.UsedResources)),
Workloads: make(map[string]*workload.Info, len(c.Workloads)),
LabelKeys: c.LabelKeys, // Shallow copy is enough.
}
for res, flavors := range c.UsedResources {
flavorsCopy := make(map[string]int64, len(flavors))
Expand Down
6 changes: 6 additions & 0 deletions pkg/capacity/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
Expand Down Expand Up @@ -55,12 +56,14 @@ func TestSnapshot(t *testing.T) {
Quota: kueue.Quota{
Guaranteed: resource.MustParse("100"),
},
Labels: map[string]string{"foo": "bar", "instance": "on-demand"},
},
{
Name: "spot",
Quota: kueue.Quota{
Guaranteed: resource.MustParse("200"),
},
Labels: map[string]string{"baz": "bar", "instance": "spot"},
},
},
},
Expand Down Expand Up @@ -221,10 +224,12 @@ func TestSnapshot(t *testing.T) {
{
Name: "demand",
Guaranteed: 100_000,
Labels: map[string]string{"foo": "bar", "instance": "on-demand"},
},
{
Name: "spot",
Guaranteed: 200_000,
Labels: map[string]string{"baz": "bar", "instance": "spot"},
},
},
},
Expand All @@ -237,6 +242,7 @@ func TestSnapshot(t *testing.T) {
Workloads: map[string]*workload.Info{
"/alpha": workload.NewInfo(&workloads[0]),
},
LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: {"baz": {}, "foo": {}, "instance": {}}},
},
"foobar": {
Name: "foobar",
Expand Down
72 changes: 68 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -146,7 +148,7 @@ func calculateRequirementsForAssignments(log logr.Logger, workloads []workload.I
continue
}
e := entry{Info: w}
if !e.assignFlavors(c) {
if !e.assignFlavors(log, c) {
log.V(2).Info("Workload didn't fit in remaining capacity even when borrowing")
continue
}
Expand All @@ -160,14 +162,14 @@ func calculateRequirementsForAssignments(log logr.Logger, workloads []workload.I
// borrow from the cohort.
// It returns whether the entry would fit. If it doesn't fit, the object is
// unmodified.
func (e *entry) assignFlavors(cap *capacity.Capacity) bool {
func (e *entry) assignFlavors(log logr.Logger, cap *capacity.Capacity) bool {
flavoredRequests := make([]workload.PodSetResources, 0, len(e.TotalRequests))
wUsed := make(capacity.Resources)
wBorrows := make(capacity.Resources)
for i, podSet := range e.TotalRequests {
flavors := make(map[corev1.ResourceName]string, len(podSet.Requests))
for resName, reqVal := range podSet.Requests {
rFlavor, borrow := findFlavorForResource(resName, reqVal, wUsed[resName], cap, &e.Obj.Spec.Pods[i].Spec)
rFlavor, borrow := findFlavorForResource(log, resName, reqVal, wUsed[resName], cap, &e.Obj.Spec.Pods[i].Spec)
if rFlavor == "" {
return false
}
Expand Down Expand Up @@ -239,14 +241,30 @@ func (s *Scheduler) assign(ctx context.Context, e *entry) error {
// findFlavorForResources returns a flavor which can satisfy the resource request,
// given that wUsed is the usage of flavors by previous podsets.
// If it finds a flavor, also returns any borrowing required.
func findFlavorForResource(name corev1.ResourceName, val int64, wUsed map[string]int64, cap *capacity.Capacity, spec *corev1.PodSpec) (string, int64) {
func findFlavorForResource(
log logr.Logger,
name corev1.ResourceName,
val int64,
wUsed map[string]int64,
cap *capacity.Capacity,
spec *corev1.PodSpec) (string, int64) {
// We will only check against the flavors' labels for the resource.
selector := flavorSelector(spec, cap.LabelKeys[name])
for _, flavor := range cap.RequestableResources[name] {
_, untolerated := corev1helpers.FindMatchingUntoleratedTaint(flavor.Taints, spec.Tolerations, func(t *corev1.Taint) bool {
return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute
})
if untolerated {
continue
}
if match, err := selector.Match(&corev1.Node{ObjectMeta: metav1.ObjectMeta{Labels: flavor.Labels}}); !match || err != nil {
if err != nil {
log.Error(err, "Matching workload affinity against flavor; no flavor assigned")
return "", 0
}
continue
}

// Consider the usage assigned to previous pod sets.
ok, borrow := fitsFlavorLimits(name, val+wUsed[flavor.Name], cap, &flavor)
if ok {
Expand All @@ -256,6 +274,52 @@ func findFlavorForResource(name corev1.ResourceName, val int64, wUsed map[string
return "", 0
}

func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.String) nodeaffinity.RequiredNodeAffinity {
// This function generally replicates the implementation of kube-scheduler's NodeAffintiy
// Filter plugin as of v1.24.
var specCopy corev1.PodSpec

// Remove affinity constraints with irrelevant keys.
if len(spec.NodeSelector) != 0 {
specCopy.NodeSelector = map[string]string{}
for k, v := range spec.NodeSelector {
if allowedKeys.Has(k) {
specCopy.NodeSelector[k] = v
}
}
}

affinity := spec.Affinity
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
var termsCopy []corev1.NodeSelectorTerm
for _, t := range affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms {
var expCopy []corev1.NodeSelectorRequirement
for _, e := range t.MatchExpressions {
if allowedKeys.Has(e.Key) {
expCopy = append(expCopy, e)
}
}
// If a term becomes empty, it means node affinity matches any flavor since those terms are ORed,
// and so matching gets reduced to spec.NodeSelector
if len(expCopy) == 0 {
termsCopy = nil
break
}
termsCopy = append(termsCopy, corev1.NodeSelectorTerm{MatchExpressions: expCopy})
}
if len(termsCopy) != 0 {
specCopy.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: termsCopy,
},
},
}
}
}
return nodeaffinity.GetRequiredNodeAffinity(&corev1.Pod{Spec: specCopy})
}

// fitsFlavorLimits returns whether a requested resource fits in a specific flavor's quota limits.
// If it fits, also returns any borrowing required.
func fitsFlavorLimits(name corev1.ResourceName, val int64, cap *capacity.Capacity, flavor *capacity.FlavorInfo) (bool, int64) {
Expand Down
Loading