Skip to content

Commit

Permalink
Merge pull request #73 from ahg-g/ahg-affinity
Browse files Browse the repository at this point in the history
Add support for affinity matching
  • Loading branch information
k8s-ci-robot authored Mar 1, 2022
2 parents d74ccc6 + bbdcea9 commit 863efbd
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 7 deletions.
28 changes: 27 additions & 1 deletion pkg/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
Expand Down Expand Up @@ -78,6 +79,10 @@ type Capacity struct {
RequestableResources map[corev1.ResourceName][]FlavorInfo
UsedResources Resources
Workloads map[string]*workload.Info
// The set of key labels from all flavors of a resource.
// Those keys define the affinity terms of a workload
// that can be matched against the flavors.
LabelKeys map[corev1.ResourceName]sets.String
}

// FlavorInfo holds processed flavor type.
Expand All @@ -86,6 +91,7 @@ type FlavorInfo struct {
Guaranteed int64
Ceiling int64
Taints []corev1.Taint
Labels map[string]string
}

func NewCapacity(cap *kueue.Capacity) *Capacity {
Expand All @@ -96,17 +102,29 @@ func NewCapacity(cap *kueue.Capacity) *Capacity {
Workloads: map[string]*workload.Info{},
}

labelKeys := map[corev1.ResourceName]sets.String{}
for _, r := range cap.Spec.RequestableResources {
if len(r.Flavors) == 0 {
continue
}

resKeys := sets.NewString()
ts := make(map[string]int64, len(r.Flavors))
for _, t := range r.Flavors {
for k := range t.Labels {
resKeys.Insert(k)
}
ts[t.Name] = 0
}
if len(resKeys) != 0 {
labelKeys[r.Name] = resKeys
}
c.UsedResources[r.Name] = ts
}

if len(labelKeys) != 0 {
c.LabelKeys = labelKeys
}
return c
}

Expand Down Expand Up @@ -382,12 +400,20 @@ func resourcesByName(in []kueue.Resource) map[corev1.ResourceName][]FlavorInfo {
flavors := make([]FlavorInfo, len(r.Flavors))
for i := range flavors {
f := &r.Flavors[i]
flavors[i] = FlavorInfo{
fInfo := FlavorInfo{
Name: f.Name,
Guaranteed: workload.ResourceValue(r.Name, f.Quota.Guaranteed),
Ceiling: workload.ResourceValue(r.Name, f.Quota.Ceiling),
Taints: append([]corev1.Taint(nil), f.Taints...),
}
if len(f.Labels) != 0 {
fInfo.Labels = make(map[string]string, len(f.Labels))
for k, v := range f.Labels {
fInfo.Labels[k] = v
}
}
flavors[i] = fInfo

}
out[r.Name] = flavors
}
Expand Down
1 change: 1 addition & 0 deletions pkg/capacity/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (c *Capacity) snapshot() *Capacity {
RequestableResources: c.RequestableResources, // Shallow copy is enough.
UsedResources: make(Resources, len(c.UsedResources)),
Workloads: make(map[string]*workload.Info, len(c.Workloads)),
LabelKeys: c.LabelKeys, // Shallow copy is enough.
}
for res, flavors := range c.UsedResources {
flavorsCopy := make(map[string]int64, len(flavors))
Expand Down
6 changes: 6 additions & 0 deletions pkg/capacity/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
Expand Down Expand Up @@ -55,12 +56,14 @@ func TestSnapshot(t *testing.T) {
Quota: kueue.Quota{
Guaranteed: resource.MustParse("100"),
},
Labels: map[string]string{"foo": "bar", "instance": "on-demand"},
},
{
Name: "spot",
Quota: kueue.Quota{
Guaranteed: resource.MustParse("200"),
},
Labels: map[string]string{"baz": "bar", "instance": "spot"},
},
},
},
Expand Down Expand Up @@ -221,10 +224,12 @@ func TestSnapshot(t *testing.T) {
{
Name: "demand",
Guaranteed: 100_000,
Labels: map[string]string{"foo": "bar", "instance": "on-demand"},
},
{
Name: "spot",
Guaranteed: 200_000,
Labels: map[string]string{"baz": "bar", "instance": "spot"},
},
},
},
Expand All @@ -237,6 +242,7 @@ func TestSnapshot(t *testing.T) {
Workloads: map[string]*workload.Info{
"/alpha": workload.NewInfo(&workloads[0]),
},
LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: {"baz": {}, "foo": {}, "instance": {}}},
},
"foobar": {
Name: "foobar",
Expand Down
72 changes: 68 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -146,7 +148,7 @@ func calculateRequirementsForAssignments(log logr.Logger, workloads []workload.I
continue
}
e := entry{Info: w}
if !e.assignFlavors(c) {
if !e.assignFlavors(log, c) {
log.V(2).Info("Workload didn't fit in remaining capacity even when borrowing")
continue
}
Expand All @@ -160,14 +162,14 @@ func calculateRequirementsForAssignments(log logr.Logger, workloads []workload.I
// borrow from the cohort.
// It returns whether the entry would fit. If it doesn't fit, the object is
// unmodified.
func (e *entry) assignFlavors(cap *capacity.Capacity) bool {
func (e *entry) assignFlavors(log logr.Logger, cap *capacity.Capacity) bool {
flavoredRequests := make([]workload.PodSetResources, 0, len(e.TotalRequests))
wUsed := make(capacity.Resources)
wBorrows := make(capacity.Resources)
for i, podSet := range e.TotalRequests {
flavors := make(map[corev1.ResourceName]string, len(podSet.Requests))
for resName, reqVal := range podSet.Requests {
rFlavor, borrow := findFlavorForResource(resName, reqVal, wUsed[resName], cap, &e.Obj.Spec.Pods[i].Spec)
rFlavor, borrow := findFlavorForResource(log, resName, reqVal, wUsed[resName], cap, &e.Obj.Spec.Pods[i].Spec)
if rFlavor == "" {
return false
}
Expand Down Expand Up @@ -239,14 +241,30 @@ func (s *Scheduler) assign(ctx context.Context, e *entry) error {
// findFlavorForResources returns a flavor which can satisfy the resource request,
// given that wUsed is the usage of flavors by previous podsets.
// If it finds a flavor, also returns any borrowing required.
func findFlavorForResource(name corev1.ResourceName, val int64, wUsed map[string]int64, cap *capacity.Capacity, spec *corev1.PodSpec) (string, int64) {
func findFlavorForResource(
log logr.Logger,
name corev1.ResourceName,
val int64,
wUsed map[string]int64,
cap *capacity.Capacity,
spec *corev1.PodSpec) (string, int64) {
// We will only check against the flavors' labels for the resource.
selector := flavorSelector(spec, cap.LabelKeys[name])
for _, flavor := range cap.RequestableResources[name] {
_, untolerated := corev1helpers.FindMatchingUntoleratedTaint(flavor.Taints, spec.Tolerations, func(t *corev1.Taint) bool {
return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute
})
if untolerated {
continue
}
if match, err := selector.Match(&corev1.Node{ObjectMeta: metav1.ObjectMeta{Labels: flavor.Labels}}); !match || err != nil {
if err != nil {
log.Error(err, "Matching workload affinity against flavor; no flavor assigned")
return "", 0
}
continue
}

// Consider the usage assigned to previous pod sets.
ok, borrow := fitsFlavorLimits(name, val+wUsed[flavor.Name], cap, &flavor)
if ok {
Expand All @@ -256,6 +274,52 @@ func findFlavorForResource(name corev1.ResourceName, val int64, wUsed map[string
return "", 0
}

func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.String) nodeaffinity.RequiredNodeAffinity {
// This function generally replicates the implementation of kube-scheduler's NodeAffintiy
// Filter plugin as of v1.24.
var specCopy corev1.PodSpec

// Remove affinity constraints with irrelevant keys.
if len(spec.NodeSelector) != 0 {
specCopy.NodeSelector = map[string]string{}
for k, v := range spec.NodeSelector {
if allowedKeys.Has(k) {
specCopy.NodeSelector[k] = v
}
}
}

affinity := spec.Affinity
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
var termsCopy []corev1.NodeSelectorTerm
for _, t := range affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms {
var expCopy []corev1.NodeSelectorRequirement
for _, e := range t.MatchExpressions {
if allowedKeys.Has(e.Key) {
expCopy = append(expCopy, e)
}
}
// If a term becomes empty, it means node affinity matches any flavor since those terms are ORed,
// and so matching gets reduced to spec.NodeSelector
if len(expCopy) == 0 {
termsCopy = nil
break
}
termsCopy = append(termsCopy, corev1.NodeSelectorTerm{MatchExpressions: expCopy})
}
if len(termsCopy) != 0 {
specCopy.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: termsCopy,
},
},
}
}
}
return nodeaffinity.GetRequiredNodeAffinity(&corev1.Pod{Spec: specCopy})
}

// fitsFlavorLimits returns whether a requested resource fits in a specific flavor's quota limits.
// If it fits, also returns any borrowing required.
func fitsFlavorLimits(name corev1.ResourceName, val int64, cap *capacity.Capacity, flavor *capacity.FlavorInfo) (bool, int64) {
Expand Down
Loading

0 comments on commit 863efbd

Please sign in to comment.