From b14e4572de717482c39a0013d123e96ecce9467a Mon Sep 17 00:00:00 2001 From: Sebastian Widmer Date: Fri, 25 Nov 2022 10:16:17 +0100 Subject: [PATCH] Calculate ratio by node class (#46) --- config.go | 40 +++++-- config.yaml | 12 +- config_test.go | 84 ++++++++++++++ controllers/ratio_controller.go | 45 +++++--- controllers/ratio_controller_test.go | 17 ++- limits/limits.go | 32 ++++++ limits/limits_test.go | 82 ++++++++++++++ main.go | 29 ++--- ratio/fetcher.go | 21 +++- ratio/fetcher_test.go | 107 +++++++++++++----- ratio/ratio.go | 8 +- ratio/ratio_test.go | 8 +- webhooks/ratio_validator.go | 146 +++++++++++++++++++----- webhooks/ratio_validator_test.go | 160 +++++++++++++++++++++------ 14 files changed, 650 insertions(+), 141 deletions(-) create mode 100644 config_test.go create mode 100644 limits/limits.go create mode 100644 limits/limits_test.go diff --git a/config.go b/config.go index 73b4fe9..271d6da 100644 --- a/config.go +++ b/config.go @@ -4,7 +4,9 @@ import ( "errors" "os" + "github.com/appuio/appuio-cloud-agent/limits" "go.uber.org/multierr" + "k8s.io/apimachinery/pkg/api/resource" "sigs.k8s.io/yaml" ) @@ -13,7 +15,12 @@ type Config struct { OrganizationLabel string // MemoryPerCoreLimit is the fair use limit of memory usage per CPU core - MemoryPerCoreLimit string + // it is deprecated and will be removed in a future version. + // Use MemoryPerCoreLimits: {Limit: "XGi"} instead. + MemoryPerCoreLimit *resource.Quantity + // MemoryPerCoreLimits is the fair use limit of memory usage per CPU core + // It is possible to select limits by node selector labels + MemoryPerCoreLimits limits.Limits // Privileged* is a list of the given type allowed to bypass restrictions. // Wildcards are supported (e.g. "system:serviceaccount:default:*" or "cluster-*-operator"). @@ -35,13 +42,18 @@ type Config struct { DefaultOrganizationClusterRoles map[string]string } -func ConfigFromFile(path string) (Config, error) { +func ConfigFromFile(path string) (c Config, warn []string, err error) { raw, err := os.ReadFile(path) if err != nil { - return Config{}, err + return Config{}, nil, err } - var c Config - return c, yaml.Unmarshal(raw, &c, yaml.DisallowUnknownFields) + err = yaml.Unmarshal(raw, &c, yaml.DisallowUnknownFields) + if err != nil { + return Config{}, nil, err + } + + c, warnings := migrateConfig(c) + return c, warnings, nil } func (c Config) Validate() error { @@ -50,9 +62,21 @@ func (c Config) Validate() error { if c.OrganizationLabel == "" { errs = append(errs, errors.New("OrganizationLabel must not be empty")) } - if c.MemoryPerCoreLimit == "" { - errs = append(errs, errors.New("MemoryPerCoreLimit must not be empty")) - } return multierr.Combine(errs...) } + +func migrateConfig(c Config) (Config, []string) { + warnings := make([]string, 0) + + if c.MemoryPerCoreLimit != nil && c.MemoryPerCoreLimits == nil { + warnings = append(warnings, "MemoryPerCoreLimit is deprecated and will be removed in a future version. Use MemoryPerCoreLimits: {Limit: \"XGi\"} instead.") + c.MemoryPerCoreLimits = limits.Limits{ + { + Limit: c.MemoryPerCoreLimit, + }, + } + } + + return c, warnings +} diff --git a/config.yaml b/config.yaml index b81e614..67b4ab5 100644 --- a/config.yaml +++ b/config.yaml @@ -1,8 +1,16 @@ --- # The label used to mark namespaces to belong to an organization OrganizationLabel: appuio.io/organization -# The fair use limit of memory usage per CPU core -MemoryPerCoreLimit: 4Gi + +# The fair use limit of memory usage per CPU core. +# It is possible to select limits by node selector labels. +MemoryPerCoreLimits: +- Limit: 4Gi + NodeSelector: + matchExpressions: + - key: class + operator: DoesNotExist + # Privileged* is a list of the given type allowed to bypass restrictions. # Wildcards are supported (e.g. "system:serviceaccount:default:*" or "cluster-*-operator"). # ClusterRoles are only ever matched if they are bound through a ClusterRoleBinding, diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..e96fad6 --- /dev/null +++ b/config_test.go @@ -0,0 +1,84 @@ +package main + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const limitsYAML = ` +MemoryPerCoreLimits: +- NodeSelector: + matchExpressions: + - key: class + operator: DoesNotExist + Limit: 2Gi +` +const limitYAML = ` +MemoryPerCoreLimit: 1Gi +` + +func Test_Config_MemoryPerCoreLimits(t *testing.T) { + testCases := []struct { + desc string + yaml string + warnings int + limit string + nodeSelector metav1.LabelSelector + }{ + { + desc: "MemoryPerCoreLimits", + yaml: limitsYAML, + warnings: 0, + limit: "2Gi", + nodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "class", + Operator: metav1.LabelSelectorOpDoesNotExist, + }, + }, + }, + }, + { + desc: "MemoryPerCoreLimit_Migrate", + yaml: limitYAML, + warnings: 1, + limit: "1Gi", + nodeSelector: metav1.LabelSelector{}, + }, + { + desc: "MemoryPerCoreLimit_NoMigrateIfMemoryPerCoreLimitsIsSet", + yaml: strings.Join([]string{limitsYAML, limitYAML}, "\n"), + warnings: 0, + limit: "2Gi", + nodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "class", + Operator: metav1.LabelSelectorOpDoesNotExist, + }, + }, + }, + }, + } + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + tmp := t.TempDir() + configPath := filepath.Join(tmp, "config.yaml") + require.NoError(t, os.WriteFile(configPath, []byte(tC.yaml), 0o644)) + + c, warnings, err := ConfigFromFile(configPath) + assert.Len(t, warnings, tC.warnings) + require.NoError(t, err) + + assert.Equal(t, tC.limit, c.MemoryPerCoreLimits[0].Limit.String()) + assert.Equal(t, tC.nodeSelector, c.MemoryPerCoreLimits[0].NodeSelector) + }) + } +} diff --git a/controllers/ratio_controller.go b/controllers/ratio_controller.go index c2fe168..d28576f 100644 --- a/controllers/ratio_controller.go +++ b/controllers/ratio_controller.go @@ -3,12 +3,15 @@ package controllers import ( "context" "errors" + "fmt" + "github.com/appuio/appuio-cloud-agent/limits" "github.com/appuio/appuio-cloud-agent/ratio" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -21,12 +24,12 @@ type RatioReconciler struct { Recorder record.EventRecorder Scheme *runtime.Scheme - Ratio ratioFetcher - RatioLimit *resource.Quantity + Ratio ratioFetcher + RatioLimits limits.Limits } type ratioFetcher interface { - FetchRatio(ctx context.Context, ns string) (*ratio.Ratio, error) + FetchRatios(ctx context.Context, ns string) (map[string]*ratio.Ratio, error) } //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch @@ -38,7 +41,7 @@ var eventReason = "TooMuchCPURequest" func (r *RatioReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { l := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name) - nsRatio, err := r.Ratio.FetchRatio(ctx, req.Namespace) + nsRatios, err := r.Ratio.FetchRatios(ctx, req.Namespace) if err != nil { if errors.Is(err, ratio.ErrorDisabled) { l.V(1).Info("namespace disabled") @@ -48,21 +51,33 @@ func (r *RatioReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, err } - if nsRatio.Below(*r.RatioLimit) { - l.Info("recording warn event: ratio too low") - - if err := r.warnPod(ctx, req.Name, req.Namespace, nsRatio); err != nil { - l.Error(err, "failed to record event on pod") + for nodeSel, ratio := range nsRatios { + sel, err := labels.ConvertSelectorToLabelsMap(nodeSel) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to convert node selector '%s' to labels map: %w", nodeSel, err) + } + limit := r.RatioLimits.GetLimitForNodeSelector(sel) + if limit == nil { + l.Info("no limit found for node selector", "nodeSelector", nodeSel) + continue } - if err := r.warnNamespace(ctx, req.Namespace, nsRatio); err != nil { - l.Error(err, "failed to record event on namespace") + + if ratio.Below(*limit) { + l.Info("recording warn event: ratio too low") + + if err := r.warnPod(ctx, req.Name, req.Namespace, ratio, nodeSel, limit); err != nil { + l.Error(err, "failed to record event on pod") + } + if err := r.warnNamespace(ctx, req.Namespace, ratio, nodeSel, limit); err != nil { + l.Error(err, "failed to record event on namespace") + } } } return ctrl.Result{}, nil } -func (r *RatioReconciler) warnPod(ctx context.Context, name, namespace string, nsRatio *ratio.Ratio) error { +func (r *RatioReconciler) warnPod(ctx context.Context, name, namespace string, nsRatio *ratio.Ratio, sel string, limit *resource.Quantity) error { pod := corev1.Pod{} err := r.Get(ctx, client.ObjectKey{ Namespace: namespace, @@ -71,10 +86,10 @@ func (r *RatioReconciler) warnPod(ctx context.Context, name, namespace string, n if err != nil { return err } - r.Recorder.Event(&pod, "Warning", eventReason, nsRatio.Warn(r.RatioLimit)) + r.Recorder.Event(&pod, "Warning", eventReason, nsRatio.Warn(limit, sel)) return nil } -func (r *RatioReconciler) warnNamespace(ctx context.Context, name string, nsRatio *ratio.Ratio) error { +func (r *RatioReconciler) warnNamespace(ctx context.Context, name string, nsRatio *ratio.Ratio, sel string, limit *resource.Quantity) error { ns := corev1.Namespace{} err := r.Get(ctx, client.ObjectKey{ Name: name, @@ -82,7 +97,7 @@ func (r *RatioReconciler) warnNamespace(ctx context.Context, name string, nsRati if err != nil { return err } - r.Recorder.Event(&ns, "Warning", eventReason, nsRatio.Warn(r.RatioLimit)) + r.Recorder.Event(&ns, "Warning", eventReason, nsRatio.Warn(limit, sel)) return nil } diff --git a/controllers/ratio_controller_test.go b/controllers/ratio_controller_test.go index 683046c..470b72d 100644 --- a/controllers/ratio_controller_test.go +++ b/controllers/ratio_controller_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/appuio/appuio-cloud-agent/limits" "github.com/appuio/appuio-cloud-agent/ratio" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -164,22 +165,26 @@ func prepareRatioTest(t *testing.T, cfg testRatioCfg) *RatioReconciler { Scheme: scheme, Ratio: fakeRatioFetcher{ err: cfg.fetchErr, - ratio: &ratio.Ratio{ + ratios: map[string]*ratio.Ratio{"": { CPU: cfg.fetchCPU.AsDec(), Memory: cfg.fetchMemory.AsDec(), }, + }}, + RatioLimits: limits.Limits{ + { + Limit: &cfg.limit, + }, }, - RatioLimit: &cfg.limit, } } type fakeRatioFetcher struct { - err error - ratio *ratio.Ratio + err error + ratios map[string]*ratio.Ratio } -func (f fakeRatioFetcher) FetchRatio(ctx context.Context, ns string) (*ratio.Ratio, error) { - return f.ratio, f.err +func (f fakeRatioFetcher) FetchRatios(ctx context.Context, ns string) (map[string]*ratio.Ratio, error) { + return f.ratios, f.err } var testNs = &corev1.Namespace{ diff --git a/limits/limits.go b/limits/limits.go new file mode 100644 index 0000000..9800412 --- /dev/null +++ b/limits/limits.go @@ -0,0 +1,32 @@ +package limits + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +type Limit struct { + NodeSelector metav1.LabelSelector + Limit *resource.Quantity +} + +type Limits []Limit + +// GetLimitForNodeSelector returns the first limit that matches the given node selector. +// If no limit matches, an empty string is returned. +// Limits with invalid node selectors are ignored. +func (l Limits) GetLimitForNodeSelector(nodeSelector map[string]string) *resource.Quantity { + for _, limit := range l { + limitSel, err := metav1.LabelSelectorAsSelector(&limit.NodeSelector) + if err != nil { + continue + } + + if limitSel.Matches(labels.Set(nodeSelector)) { + return limit.Limit + } + } + + return nil +} diff --git a/limits/limits_test.go b/limits/limits_test.go new file mode 100644 index 0000000..8684199 --- /dev/null +++ b/limits/limits_test.go @@ -0,0 +1,82 @@ +package limits_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/appuio/appuio-cloud-agent/limits" +) + +func TestGetLimitForNodeSelector(t *testing.T) { + subject := limits.Limits{ + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "class", + Operator: "Invalid", + }, + }, + }, + Limit: requireParseQuantity(t, "7Gi"), + }, + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "class", + Operator: metav1.LabelSelectorOpDoesNotExist, + }, + }, + }, + Limit: requireParseQuantity(t, "7Gi"), + }, + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "class", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"highmem"}, + }, + }, + }, + Limit: requireParseQuantity(t, "14Gi"), + }, + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "gpu", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"tesla"}, + }, + }, + }, + Limit: requireParseQuantity(t, "2Gi"), + }, + } + + assert.Equal(t, "7Gi", subject.GetLimitForNodeSelector(map[string]string{}).String()) + assert.Equal(t, "7Gi", subject.GetLimitForNodeSelector(map[string]string{"blubber": "blubber"}).String()) + + assert.Equal(t, "14Gi", subject.GetLimitForNodeSelector(map[string]string{"class": "highmem"}).String()) + assert.Equal(t, "14Gi", subject.GetLimitForNodeSelector(map[string]string{"class": "highmem", "blubber": "blubber"}).String()) + // First match wins + assert.Equal(t, "14Gi", subject.GetLimitForNodeSelector(map[string]string{"class": "highmem", "gpu": "tesla"}).String()) + + assert.Equal(t, "2Gi", subject.GetLimitForNodeSelector(map[string]string{"class": "other", "gpu": "tesla"}).String()) + + assert.Nil(t, subject.GetLimitForNodeSelector(map[string]string{"class": "other", "gpu": "other"})) +} + +func requireParseQuantity(t *testing.T, s string) *resource.Quantity { + t.Helper() + q, err := resource.ParseQuantity(s) + require.NoError(t, err) + return &q +} diff --git a/main.go b/main.go index 16fea60..75d9fb1 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,6 @@ import ( "os" "time" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -57,11 +56,14 @@ func main() { flag.Parse() ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - conf, err := ConfigFromFile(*configFilePath) + conf, warnings, err := ConfigFromFile(*configFilePath) if err != nil { setupLog.Error(err, "unable to read config file") os.Exit(1) } + for _, warning := range warnings { + setupLog.Info("WARNING " + warning) + } if err := conf.Validate(); err != nil { setupLog.Error(err, "invalid configuration") os.Exit(1) @@ -83,7 +85,7 @@ func main() { os.Exit(1) } - registerRatioController(mgr, conf.MemoryPerCoreLimit, conf.OrganizationLabel) + registerRatioController(mgr, conf, conf.OrganizationLabel) registerOrganizationRBACController(mgr, conf.OrganizationLabel, conf.DefaultOrganizationClusterRoles) // Currently unused, but will be used for the next kyverno replacements @@ -139,15 +141,14 @@ func registerOrganizationRBACController(mgr ctrl.Manager, orgLabel string, defau } } -func registerRatioController(mgr ctrl.Manager, memoryCPURatio, orgLabel string) { - limit, err := resource.ParseQuantity(memoryCPURatio) - if err != nil { - setupLog.Error(err, "unable to parse memory-per-core-limit") - os.Exit(1) - } +func registerRatioController(mgr ctrl.Manager, conf Config, orgLabel string) { mgr.GetWebhookServer().Register("/validate-request-ratio", &webhook.Admission{ Handler: &webhooks.RatioValidator{ - RatioLimit: &limit, + DefaultNodeSelector: conf.DefaultNodeSelector, + DefaultNamespaceNodeSelectorAnnotation: conf.DefaultNamespaceNodeSelectorAnnotation, + + Client: mgr.GetClient(), + RatioLimits: conf.MemoryPerCoreLimits, Ratio: &ratio.Fetcher{ Client: mgr.GetClient(), }, @@ -155,10 +156,10 @@ func registerRatioController(mgr ctrl.Manager, memoryCPURatio, orgLabel string) }) if err := (&controllers.RatioReconciler{ - Client: mgr.GetClient(), - Recorder: mgr.GetEventRecorderFor("resource-ratio-controller"), - Scheme: mgr.GetScheme(), - RatioLimit: &limit, + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor("resource-ratio-controller"), + Scheme: mgr.GetScheme(), + RatioLimits: conf.MemoryPerCoreLimits, Ratio: &ratio.Fetcher{ Client: mgr.GetClient(), OrganizationLabel: orgLabel, diff --git a/ratio/fetcher.go b/ratio/fetcher.go index 6300888..4d899c2 100644 --- a/ratio/fetcher.go +++ b/ratio/fetcher.go @@ -6,6 +6,7 @@ import ( "strconv" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -22,8 +23,8 @@ type Fetcher struct { OrganizationLabel string } -// FetchRatio collects the CPU to memory request ratio for the given namespace -func (f Fetcher) FetchRatio(ctx context.Context, name string) (*Ratio, error) { +// FetchRatios collects the CPU to memory request ratio for the given namespace grouped by `.spec.nodeSelector`. +func (f Fetcher) FetchRatios(ctx context.Context, name string) (map[string]*Ratio, error) { ns := corev1.Namespace{} err := f.Client.Get(ctx, client.ObjectKey{ Name: name, @@ -46,11 +47,21 @@ func (f Fetcher) FetchRatio(ctx context.Context, name string) (*Ratio, error) { } } - r := NewRatio() pods := corev1.PodList{} err = f.Client.List(ctx, &pods, client.InNamespace(name)) if err != nil { - return r, err + return nil, err } - return r.RecordPod(pods.Items...), nil + + ratios := make(map[string]*Ratio) + for _, pod := range pods.Items { + k := labels.Set(pod.Spec.NodeSelector).String() + r, ok := ratios[k] + if !ok { + r = NewRatio() + } + ratios[k] = r.RecordPod(pod) + } + + return ratios, nil } diff --git a/ratio/fetcher_test.go b/ratio/fetcher_test.go index a7cf32c..7989fc7 100644 --- a/ratio/fetcher_test.go +++ b/ratio/fetcher_test.go @@ -38,19 +38,21 @@ var ( func TestRatioValidator_Handle(t *testing.T) { ctx := context.Background() + type testRatio map[string]struct { + cpu string + memory string + } tests := map[string]struct { namespace string objects []client.Object orgLabel string - memory string - cpu string + ratios testRatio err error errCheck func(err error) bool }{ "Fetch_EmptyNamespace": { namespace: "foo", - memory: "0", - cpu: "0", + ratios: testRatio{}, }, "Fetch_Namespace": { namespace: "foo", @@ -59,8 +61,40 @@ func TestRatioValidator_Handle(t *testing.T) { foo2Pod, foobarPod, }, - memory: "3Gi", - cpu: "3", + ratios: testRatio{"": { + memory: "3Gi", + cpu: "3", + }}, + }, + "Fetch_WithDifferentNodeSelector": { + namespace: "foo", + objects: []client.Object{ + tap(fooPod, func(p *corev1.Pod) *corev1.Pod { + p.Spec.NodeSelector = map[string]string{"class": "mega"} + return p + }), + foo2Pod, + tap(foo2Pod, func(p *corev1.Pod) *corev1.Pod { + p.ObjectMeta.Name = "foo3" + p.Spec.NodeSelector = map[string]string{"class": "huge"} + return p + }), + foobarPod, + }, + ratios: testRatio{ + "": { + memory: "1Gi", + cpu: "0", + }, + "class=huge": { + memory: "1Gi", + cpu: "0", + }, + "class=mega": { + memory: "2Gi", + cpu: "3", + }, + }, }, "Fetch_NotExists": { namespace: "not-exist", @@ -79,8 +113,10 @@ func TestRatioValidator_Handle(t *testing.T) { foo2Pod, foobarPod, }, - memory: "1337Gi", - cpu: "0", + ratios: testRatio{"": { + memory: "1337Gi", + cpu: "0", + }}, }, "Fetch_WronglyDisabledNamespace": { namespace: "notdisabled-bar", @@ -94,8 +130,10 @@ func TestRatioValidator_Handle(t *testing.T) { phase: corev1.PodRunning, }), }, - memory: "1337Gi", - cpu: "0", + ratios: testRatio{"": { + memory: "1337Gi", + cpu: "0", + }}, }, "Fetch_DisabledNamespace": { @@ -166,29 +204,36 @@ func TestRatioValidator_Handle(t *testing.T) { }), foobarPod, }, - memory: "3Gi", - cpu: "3", + ratios: testRatio{"": { + memory: "3Gi", + cpu: "3", + }}, }, } - for _, tc := range tests { - r, err := prepareTest(t, testCfg{ - initObjs: tc.objects, - orgLabel: tc.orgLabel, - }).FetchRatio(ctx, tc.namespace) - if tc.err == nil { - require.NoError(t, err) - cpu := resource.MustParse(tc.cpu) - mem := resource.MustParse(tc.memory) - assert.Equal(t, *cpu.AsDec(), *r.CPU, "cpu requests equal") - assert.Equal(t, *mem.AsDec(), *r.Memory, "memory requests equal") - } else { - if tc.errCheck != nil { - require.Truef(t, tc.errCheck(err), "Unexpected error") + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + r, err := prepareTest(t, testCfg{ + initObjs: tc.objects, + orgLabel: tc.orgLabel, + }).FetchRatios(ctx, tc.namespace) + if tc.err == nil { + require.NoError(t, err) + require.Len(t, r, len(tc.ratios)) + for nodeSel, ratio := range tc.ratios { + cpu := resource.MustParse(ratio.cpu) + mem := resource.MustParse(ratio.memory) + assert.Equal(t, *cpu.AsDec(), *r[nodeSel].CPU, "cpu requests should be equal for node selector %q", nodeSel) + assert.Equal(t, *mem.AsDec(), *r[nodeSel].Memory, "memory requests should be equal for node selector %q", nodeSel) + } } else { - require.ErrorIs(t, err, tc.err) + if tc.errCheck != nil { + require.Truef(t, tc.errCheck(err), "Unexpected error") + } else { + require.ErrorIs(t, err, tc.err) + } } - } + }) } } @@ -234,3 +279,9 @@ func prepareTest(t *testing.T, cfg testCfg) Fetcher { OrganizationLabel: cfg.orgLabel, } } + +// tap deep copies the given object and allows to modify it using the given function +func tap[T runtime.Object](t T, f func(T) T) T { + ct := t.DeepCopyObject().(T) + return f(ct) +} diff --git a/ratio/ratio.go b/ratio/ratio.go index 558a1a1..49bde57 100644 --- a/ratio/ratio.go +++ b/ratio/ratio.go @@ -102,9 +102,13 @@ func (r Ratio) String() string { } // Warn returns a warning string explaining that the ratio is not considered fair use -func (r Ratio) Warn(limit *resource.Quantity) string { +func (r Ratio) Warn(limit *resource.Quantity, nodeSelector string) string { // WARNING(glrf) Warnings MUST NOT contain newlines. K8s will simply drop your warning if you add newlines. - w := fmt.Sprintf("Current memory to CPU ratio of %s/core in this namespace is below the fair use ratio", r.String()) + w := fmt.Sprintf("Current memory to CPU ratio of %s/core", r.String()) + if nodeSelector != "" { + w = fmt.Sprintf("%s for node type %q", w, nodeSelector) + } + w = fmt.Sprintf("%s in this namespace is below the fair use ratio", w) if limit != nil { w = fmt.Sprintf("%s of %s/core", w, limit) } diff --git a/ratio/ratio_test.go b/ratio/ratio_test.go index 61cd422..0518355 100644 --- a/ratio/ratio_test.go +++ b/ratio/ratio_test.go @@ -288,9 +288,11 @@ func TestRatio_Warn(t *testing.T) { CPU: cpu.AsDec(), Memory: memory.AsDec(), } - assert.Contains(t, r.Warn(nil), "1Gi") + assert.Contains(t, r.Warn(nil, ""), "1Gi") lim := resource.MustParse("1Mi") - assert.Contains(t, r.Warn(&lim), "1Mi") + assert.Contains(t, r.Warn(&lim, ""), "1Mi") + + assert.Contains(t, r.Warn(&lim, "class=x"), "class=x") } func FuzzRatio(f *testing.F) { @@ -305,7 +307,7 @@ func FuzzRatio(f *testing.F) { Memory: memQuant.AsDec(), } lim := resource.MustParse(fmt.Sprintf("%dMi", limit)) - out := r.Warn(&lim) + out := r.Warn(&lim, "") assert.NotEmpty(t, out) r.Below(lim) diff --git a/webhooks/ratio_validator.go b/webhooks/ratio_validator.go index efc4958..810d16c 100644 --- a/webhooks/ratio_validator.go +++ b/webhooks/ratio_validator.go @@ -6,16 +6,18 @@ import ( "net/http" "strings" - "github.com/appuio/appuio-cloud-agent/ratio" admissionv1 "k8s.io/api/admission/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + "github.com/appuio/appuio-cloud-agent/limits" + "github.com/appuio/appuio-cloud-agent/ratio" ) // +kubebuilder:webhook:path=/validate-request-ratio,name=validate-request-ratio.appuio.io,admissionReviewVersions=v1,sideEffects=none,mutating=false,failurePolicy=ignore,groups=*,resources=*,verbs=create;update,versions=*,matchPolicy=equivalent @@ -26,13 +28,19 @@ import ( // RatioValidator checks for every action in a namespace whether the Memory to CPU ratio limit is exceeded and will return a warning if it is. type RatioValidator struct { decoder *admission.Decoder + Client client.Client - Ratio ratioFetcher - RatioLimit *resource.Quantity + Ratio ratioFetcher + RatioLimits limits.Limits + + // DefaultNodeSelector is the default node selector to apply to pods + DefaultNodeSelector map[string]string + // DefaultNamespaceNodeSelectorAnnotation is the annotation to use for the default node selector + DefaultNamespaceNodeSelectorAnnotation string } type ratioFetcher interface { - FetchRatio(ctx context.Context, ns string) (*ratio.Ratio, error) + FetchRatios(ctx context.Context, ns string) (map[string]*ratio.Ratio, error) } // Handle handles the admission requests @@ -48,7 +56,7 @@ func (v *RatioValidator) Handle(ctx context.Context, req admission.Request) admi return admission.Allowed("system user") } - r, err := v.Ratio.FetchRatio(ctx, req.Namespace) + ratios, err := v.Ratio.FetchRatios(ctx, req.Namespace) if err != nil { if errors.Is(err, ratio.ErrorDisabled) { l.V(1).Info("allowed: namespace disabled") @@ -62,30 +70,65 @@ func (v *RatioValidator) Handle(ctx context.Context, req admission.Request) admi return errored(http.StatusInternalServerError, err) } - l = l.WithValues("current_ratio", r.String()) + nodeSel, err := v.getNodeSelector(req) + if err != nil { + l.Error(err, "failed to get node selector") + return errored(http.StatusBadRequest, err) + } + if len(nodeSel) == 0 { + sel, err := v.getDefaultNodeSelectorFromNamespace(ctx, req.Namespace) + if err != nil { + l.Error(err, "failed to get default node selector from namespace") + } + nodeSel = sel + } + if len(nodeSel) == 0 { + nodeSel = v.DefaultNodeSelector + } + + l = l.WithValues("current_ratios", ratios, "node_selector", nodeSel) // If we are creating an object with resource requests, we add them to the current ratio // We cannot easily do this when updating resources. if req.Operation == admissionv1.Create { + key := fuzzyMatchRatioKey(labels.Set(nodeSel).String(), ratios) + r := ratios[key] + if r == nil { + r = ratio.NewRatio() + } r, err = v.recordObject(ctx, r, req) if err != nil { l.Error(err, "failed to record object") return errored(http.StatusBadRequest, err) } + ratios[key] = r + + l = l.WithValues("ratio", r) + } + + warnings := make([]string, 0, len(ratios)) + for nodeSel, r := range ratios { + sel, err := labels.ConvertSelectorToLabelsMap(nodeSel) + if err != nil { + return errored(http.StatusInternalServerError, err) + } + limit := v.RatioLimits.GetLimitForNodeSelector(sel) + if limit == nil { + l.Info("no limit found for node selector", "nodeSelector", nodeSel) + continue + } + + if r.Below(*limit) { + l.Info("ratio too low", "node_selector", nodeSel, "ratio", r) + warnings = append(warnings, r.Warn(limit, nodeSel)) + } } - l = l.WithValues("ratio", r.String()) - - if r.Below(*v.RatioLimit) { - l.Info("warned: ratio too low") - return admission.Response{ - AdmissionResponse: admissionv1.AdmissionResponse{ - Allowed: true, - Warnings: []string{ - r.Warn(v.RatioLimit), - }, - }} + + return admission.Response{ + AdmissionResponse: admissionv1.AdmissionResponse{ + Allowed: true, + Warnings: warnings, + }, } - l.V(1).Info("allowed: ratio ok") - return admission.Allowed("ok") } func (v *RatioValidator) recordObject(ctx context.Context, r *ratio.Ratio, req admission.Request) (*ratio.Ratio, error) { @@ -112,6 +155,45 @@ func (v *RatioValidator) recordObject(ctx context.Context, r *ratio.Ratio, req a return r, nil } +func (v *RatioValidator) getNodeSelector(req admission.Request) (map[string]string, error) { + switch req.Kind.Kind { + case "Pod": + pod := corev1.Pod{} + if err := v.decoder.Decode(req, &pod); err != nil { + return nil, err + } + return pod.Spec.NodeSelector, nil + case "Deployment": + deploy := appsv1.Deployment{} + if err := v.decoder.Decode(req, &deploy); err != nil { + return nil, err + } + return deploy.Spec.Template.Spec.NodeSelector, nil + case "StatefulSet": + sts := appsv1.StatefulSet{} + if err := v.decoder.Decode(req, &sts); err != nil { + return nil, err + } + return sts.Spec.Template.Spec.NodeSelector, nil + } + return nil, nil +} + +func (v *RatioValidator) getDefaultNodeSelectorFromNamespace(ctx context.Context, namespace string) (map[string]string, error) { + ns := corev1.Namespace{} + err := v.Client.Get(ctx, client.ObjectKey{Name: namespace}, &ns) + if err != nil { + return nil, err + } + return labels.ConvertSelectorToLabelsMap(ns.Annotations[v.DefaultNamespaceNodeSelectorAnnotation]) +} + +// InjectDecoder injects a Admission request decoder +func (v *RatioValidator) InjectDecoder(d *admission.Decoder) error { + v.decoder = d + return nil +} + func errored(code int32, err error) admission.Response { return admission.Response{ AdmissionResponse: admissionv1.AdmissionResponse{ @@ -124,8 +206,22 @@ func errored(code int32, err error) admission.Response { } } -// InjectDecoder injects a Admission request decoder -func (v *RatioValidator) InjectDecoder(d *admission.Decoder) error { - v.decoder = d - return nil +// fuzzyMatchRatioKey returns the key in ratios that matches the node selector. +// If there is no exact match, it returns the key that matches a subset of the labels. +// This is done if the node selector comes from the default node selector which does +// not contain all labels that might be added by the scheduler. +func fuzzyMatchRatioKey(s string, ratios map[string]*ratio.Ratio) string { + if _, ok := ratios[s]; ok { + return s + } + + sel, _ := labels.Parse(s) + for k := range ratios { + ks, _ := labels.ConvertSelectorToLabelsMap(k) + if sel.Matches(ks) { + return k + } + } + + return s } diff --git a/webhooks/ratio_validator_test.go b/webhooks/ratio_validator_test.go index 732d5a7..c3fed1d 100644 --- a/webhooks/ratio_validator_test.go +++ b/webhooks/ratio_validator_test.go @@ -21,6 +21,7 @@ import ( "testing" + "github.com/appuio/appuio-cloud-agent/limits" "github.com/appuio/appuio-cloud-agent/ratio" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -39,7 +40,7 @@ func TestRatioValidator_Handle(t *testing.T) { object client.Object mangleObject bool create bool - limit string + limits limits.Limits warn bool fail bool statusCode int32 @@ -48,7 +49,7 @@ func TestRatioValidator_Handle(t *testing.T) { "Allow_EmptyNamespace": { user: "appuio#foo", namespace: "foo", - limit: "4Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "4Gi")}}, warn: false, }, "Allow_FairNamespace": { @@ -65,8 +66,8 @@ func TestRatioValidator_Handle(t *testing.T) { {cpu: "5", memory: "1Gi"}, }), }, - limit: "4Gi", - warn: false, + limits: limits.Limits{{Limit: requireParseQuantity(t, "4Gi")}}, + warn: false, }, "Warn_UnfairNamespace": { user: "appuio#foo", @@ -82,8 +83,31 @@ func TestRatioValidator_Handle(t *testing.T) { {cpu: "8", memory: "1Gi"}, }), }, - limit: "1Gi", - warn: true, + limits: limits.Limits{{Limit: requireParseQuantity(t, "1Gi")}}, + warn: true, + }, + "Warn_UnfairNamespace_Selectors": { + user: "appuio#foo", + namespace: "foo", + resources: []client.Object{ + podFromResources("fair", "foo", podResource{ + {cpu: "1", memory: "4Gi"}, + }, func(p *corev1.Pod) { + p.Spec.NodeSelector = map[string]string{"class": "foo"} + }), + podFromResources("unfair", "foo", podResource{ + {cpu: "2", memory: "1Gi"}, + }, func(p *corev1.Pod) { + p.Spec.NodeSelector = map[string]string{"class": "bar"} + }), + }, + limits: limits.Limits{{ + NodeSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"class": "bar"}, + }, + Limit: requireParseQuantity(t, "1Gi"), + }}, + warn: true, }, "Allow_DisabledUnfairNamespace": { user: "appuio#foo", @@ -93,8 +117,8 @@ func TestRatioValidator_Handle(t *testing.T) { {cpu: "8", memory: "1Gi"}, }), }, - limit: "1Gi", - warn: false, + limits: limits.Limits{{Limit: requireParseQuantity(t, "1Gi")}}, + warn: false, }, "Allow_LowercaseDisabledUnfairNamespace": { user: "appuio#foo", @@ -104,8 +128,8 @@ func TestRatioValidator_Handle(t *testing.T) { {cpu: "8", memory: "1Gi"}, }), }, - limit: "1Gi", - warn: false, + limits: limits.Limits{{Limit: requireParseQuantity(t, "1Gi")}}, + warn: false, }, "Allow_ServiceAccount": { @@ -122,8 +146,8 @@ func TestRatioValidator_Handle(t *testing.T) { {cpu: "8", memory: "1Gi"}, }), }, - limit: "1Gi", - warn: false, + limits: limits.Limits{{Limit: requireParseQuantity(t, "1Gi")}}, + warn: false, }, "ListFailure": { user: "bar", @@ -140,7 +164,7 @@ func TestRatioValidator_Handle(t *testing.T) { {cpu: "8", memory: "1Gi"}, }), }, - limit: "1Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "1Gi")}}, warn: false, fail: true, statusCode: http.StatusInternalServerError, @@ -149,7 +173,7 @@ func TestRatioValidator_Handle(t *testing.T) { user: "bar", namespace: "notexits", resources: []client.Object{}, - limit: "1Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "1Gi")}}, warn: false, fail: true, statusCode: http.StatusNotFound, @@ -169,7 +193,7 @@ func TestRatioValidator_Handle(t *testing.T) { object: podFromResources("unfair", "foo", podResource{ {cpu: "8", memory: "1Gi"}, }), - limit: "4Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "4Gi")}}, warn: true, create: true, }, @@ -181,24 +205,62 @@ func TestRatioValidator_Handle(t *testing.T) { {cpu: "8", memory: "1Gi"}, }), mangleObject: true, - limit: "4Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "4Gi")}}, warn: false, create: true, fail: true, statusCode: http.StatusBadRequest, }, - "Warn_ConsiderNewDeployment": { + "Warn_ConsiderNewDeployment_EmptyNS": { + user: "appuio#foo", + namespace: "foo", + resources: []client.Object{}, + object: deploymentFromResources("unfair", "foo", 2, podResource{ + {cpu: "2", memory: "1Gi"}, + }), + limits: limits.Limits{{ + Limit: requireParseQuantity(t, "1Gi"), + }}, + warn: true, + create: true, + }, + "Warn_ConsiderNewDeployment_NodeSelectorFromNamespace": { + user: "appuio#foo", + namespace: "ns-with-default-node-selector", + resources: []client.Object{}, + object: deploymentFromResources("ns-with-default-node-selector", "foo", 2, podResource{ + {cpu: "2", memory: "1Gi"}, + }), + limits: limits.Limits{{ + NodeSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"class": "plus"}, + }, + Limit: requireParseQuantity(t, "1Gi"), + }}, + warn: true, + create: true, + }, + "Warn_ConsiderNewDeployment_FuzzyMatchNodeSelector": { user: "appuio#foo", namespace: "foo", resources: []client.Object{ podFromResources("pod1", "foo", podResource{ - {cpu: "0", memory: "4Gi"}, + {cpu: "1", memory: "1Gi"}, + }, func(p *corev1.Pod) { + p.Spec.NodeSelector = map[string]string{"app": "true", "class": "foo"} }), }, object: deploymentFromResources("unfair", "foo", 2, podResource{ - {cpu: "1", memory: "1Gi"}, + {cpu: "2", memory: "1Gi"}, + }, func(d *appsv1.Deployment) { + d.Spec.Template.Spec.NodeSelector = map[string]string{"class": "foo"} }), - limit: "4Gi", + limits: limits.Limits{{ + NodeSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "true"}, + }, + Limit: requireParseQuantity(t, "1Gi"), + }}, warn: true, create: true, }, @@ -208,7 +270,7 @@ func TestRatioValidator_Handle(t *testing.T) { resources: []client.Object{}, object: deploymentFromResources("unfair", "foo", 2, podResource{}), mangleObject: true, - limit: "4Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "4Gi")}}, warn: false, create: true, fail: true, @@ -225,7 +287,7 @@ func TestRatioValidator_Handle(t *testing.T) { object: statefulsetFromResources("unfair", "foo", 2, podResource{ {cpu: "1", memory: "1Gi"}, }), - limit: "4Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "4Gi")}}, warn: true, create: true, }, @@ -235,7 +297,7 @@ func TestRatioValidator_Handle(t *testing.T) { resources: []client.Object{}, object: statefulsetFromResources("unfair", "foo", 2, podResource{}), mangleObject: true, - limit: "4Gi", + limits: limits.Limits{{Limit: requireParseQuantity(t, "4Gi")}}, warn: false, create: true, fail: true, @@ -244,10 +306,12 @@ func TestRatioValidator_Handle(t *testing.T) { } for name, tc := range tests { + name, tc := name, tc t.Run(name, func(t *testing.T) { + t.Parallel() + v := prepareTest(t, tc.resources...) - limit := resource.MustParse(tc.limit) - v.RatioLimit = &limit + v.RatioLimits = tc.limits ar := admission.Request{ AdmissionRequest: admissionv1.AdmissionRequest{ @@ -315,6 +379,8 @@ func TestRatioValidator_Handle(t *testing.T) { } func prepareTest(t *testing.T, initObjs ...client.Object) *RatioValidator { + const defaultNodeSelectorAnnotation = "test.io/node-selector" + scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) @@ -334,13 +400,22 @@ func prepareTest(t *testing.T, initObjs ...client.Object) *RatioValidator { ratio.RatioValidatiorDisableAnnotation: "true", } - initObjs = append(initObjs, testNamespace("foo"), barNs, disabledNs, otherDisabledNs) + nsWithDefaultNodeSelector := testNamespace("ns-with-default-node-selector") + nsWithDefaultNodeSelector.Annotations = map[string]string{ + defaultNodeSelectorAnnotation: "class=plus", + } + + initObjs = append(initObjs, testNamespace("foo"), barNs, disabledNs, otherDisabledNs, nsWithDefaultNodeSelector) client := fake.NewClientBuilder(). WithScheme(scheme). WithObjects(initObjs...). Build() uv := &RatioValidator{ + DefaultNamespaceNodeSelectorAnnotation: defaultNodeSelectorAnnotation, + + Client: failingClient{client}, + Ratio: ratio.Fetcher{ Client: failingClient{client}, }, @@ -357,7 +432,7 @@ func testNamespace(name string) *corev1.Namespace { } } -func podFromResources(name, namespace string, res podResource) *corev1.Pod { +func podFromResources(name, namespace string, res podResource, modify ...func(*corev1.Pod)) *corev1.Pod { p := corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", @@ -386,10 +461,15 @@ func podFromResources(name, namespace string, res podResource) *corev1.Pod { } p.Spec.Containers = append(p.Spec.Containers, c) } + + for _, m := range modify { + m(&p) + } + return &p } -func deploymentFromResources(name, namespace string, replicas int32, res podResource) *appsv1.Deployment { +func deploymentFromResources(name, namespace string, replicas int32, res podResource, modify ...func(*appsv1.Deployment)) *appsv1.Deployment { deploy := appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ Kind: "Deployment", @@ -402,9 +482,15 @@ func deploymentFromResources(name, namespace string, replicas int32, res podReso } deploy.Spec.Replicas = &replicas deploy.Spec.Template.Spec.Containers = newTestContainers(res) + + for _, m := range modify { + m(&deploy) + } + return &deploy } -func statefulsetFromResources(name, namespace string, replicas int32, res podResource) *appsv1.StatefulSet { + +func statefulsetFromResources(name, namespace string, replicas int32, res podResource, modify ...func(*appsv1.StatefulSet)) *appsv1.StatefulSet { sts := appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", @@ -417,6 +503,11 @@ func statefulsetFromResources(name, namespace string, replicas int32, res podRes } sts.Spec.Replicas = &replicas sts.Spec.Template.Spec.Containers = newTestContainers(res) + + for _, m := range modify { + m(&sts) + } + return &sts } @@ -439,10 +530,6 @@ func newTestContainers(res []containerResources) []corev1.Container { return containers } -type deployResource struct { - containers []containerResources - replicas int32 -} type podResource []containerResources type containerResources struct { cpu string @@ -463,3 +550,10 @@ func (c failingClient) List(ctx context.Context, list client.ObjectList, opts .. } return c.WithWatch.List(ctx, list, opts...) } + +func requireParseQuantity(t *testing.T, s string) *resource.Quantity { + t.Helper() + q, err := resource.ParseQuantity(s) + require.NoError(t, err) + return &q +}