From 0c82cae2383bcca762d4e49816146def0aebc0ff Mon Sep 17 00:00:00 2001 From: Tao Li Date: Thu, 19 May 2022 19:21:28 +0800 Subject: [PATCH] koordlet: support NodeMetricCollectPolicy NodeMetricCollectPolicy is used to configure the aggregation period and reporting period. NodeMetric Controller will synchronize the configuration to NodeMetric from the slo-controller-config configuration. Signed-off-by: Tao Li --- apis/slo/v1alpha1/nodemetric_types.go | 13 +- apis/slo/v1alpha1/zz_generated.deepcopy.go | 32 +++- .../bases/slo.koordinator.sh_nodemetrics.yaml | 15 ++ pkg/koordlet/reporter/reporter.go | 33 +++- pkg/koordlet/reporter/reporter_test.go | 40 +++++ pkg/slo-controller/config/config.go | 30 ++-- pkg/slo-controller/config/config_test.go | 14 +- .../nodemetric/collect_policy.go | 54 ++++++ .../nodemetric/collect_policy_test.go | 160 ++++++++++++++++++ pkg/slo-controller/nodemetric/nodemetric.go | 5 +- .../nodemetric/nodemetric_controller.go | 75 ++++++++ .../nodemetric/nodemetric_controller_test.go | 83 +++++++++ pkg/slo-controller/nodemetric/suite_test.go | 2 +- 13 files changed, 530 insertions(+), 26 deletions(-) create mode 100644 pkg/slo-controller/nodemetric/collect_policy.go create mode 100644 pkg/slo-controller/nodemetric/collect_policy_test.go diff --git a/apis/slo/v1alpha1/nodemetric_types.go b/apis/slo/v1alpha1/nodemetric_types.go index ae2fba092..901ab99e5 100644 --- a/apis/slo/v1alpha1/nodemetric_types.go +++ b/apis/slo/v1alpha1/nodemetric_types.go @@ -39,7 +39,18 @@ type ResourceMap struct { } // NodeMetricSpec defines the desired state of NodeMetric -type NodeMetricSpec struct{} +type NodeMetricSpec struct { + // CollectPolicy defines the Metric collection policy + CollectPolicy *NodeMetricCollectPolicy `json:"metricCollectPolicy,omitempty"` +} + +// NodeMetricCollectPolicy defines the Metric collection policy +type NodeMetricCollectPolicy struct { + // AggregateDurationSeconds represents the aggregation period in seconds + AggregateDurationSeconds *int64 `json:"aggregateDurationSeconds,omitempty"` + // ReportIntervalSeconds represents the report period in seconds + ReportIntervalSeconds *int64 `json:"reportIntervalSeconds,omitempty"` +} // NodeMetricStatus defines the observed state of NodeMetric type NodeMetricStatus struct { diff --git a/apis/slo/v1alpha1/zz_generated.deepcopy.go b/apis/slo/v1alpha1/zz_generated.deepcopy.go index eae22ebf5..2cda401f2 100644 --- a/apis/slo/v1alpha1/zz_generated.deepcopy.go +++ b/apis/slo/v1alpha1/zz_generated.deepcopy.go @@ -163,7 +163,7 @@ func (in *NodeMetric) DeepCopyInto(out *NodeMetric) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -185,6 +185,31 @@ func (in *NodeMetric) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeMetricCollectPolicy) DeepCopyInto(out *NodeMetricCollectPolicy) { + *out = *in + if in.AggregateDurationSeconds != nil { + in, out := &in.AggregateDurationSeconds, &out.AggregateDurationSeconds + *out = new(int64) + **out = **in + } + if in.ReportIntervalSeconds != nil { + in, out := &in.ReportIntervalSeconds, &out.ReportIntervalSeconds + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeMetricCollectPolicy. +func (in *NodeMetricCollectPolicy) DeepCopy() *NodeMetricCollectPolicy { + if in == nil { + return nil + } + out := new(NodeMetricCollectPolicy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NodeMetricInfo) DeepCopyInto(out *NodeMetricInfo) { *out = *in @@ -236,6 +261,11 @@ func (in *NodeMetricList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NodeMetricSpec) DeepCopyInto(out *NodeMetricSpec) { *out = *in + if in.CollectPolicy != nil { + in, out := &in.CollectPolicy, &out.CollectPolicy + *out = new(NodeMetricCollectPolicy) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeMetricSpec. diff --git a/config/crd/bases/slo.koordinator.sh_nodemetrics.yaml b/config/crd/bases/slo.koordinator.sh_nodemetrics.yaml index 56d3ac6e7..43fb648f6 100644 --- a/config/crd/bases/slo.koordinator.sh_nodemetrics.yaml +++ b/config/crd/bases/slo.koordinator.sh_nodemetrics.yaml @@ -34,6 +34,21 @@ spec: type: object spec: description: NodeMetricSpec defines the desired state of NodeMetric + properties: + metricCollectPolicy: + description: CollectPolicy defines the Metric collection policy + properties: + aggregateDurationSeconds: + description: AggregateDurationSeconds represents the aggregation + period in seconds + format: int64 + type: integer + reportIntervalSeconds: + description: ReportIntervalSeconds represents the report period + in seconds + format: int64 + type: integer + type: object type: object status: description: NodeMetricStatus defines the observed state of NodeMetric diff --git a/pkg/koordlet/reporter/reporter.go b/pkg/koordlet/reporter/reporter.go index b50672258..74e2bed1b 100644 --- a/pkg/koordlet/reporter/reporter.go +++ b/pkg/koordlet/reporter/reporter.go @@ -29,7 +29,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -139,9 +138,8 @@ func (r *reporter) Run(stopCh <-chan struct{}) error { return fmt.Errorf("timed out waiting for node metric caches to sync") } - go wait.Until(func() { - r.sync() - }, time.Duration(r.config.ReportIntervalSeconds)*time.Second, stopCh) + go r.syncNodeMetricWorker(stopCh) + } else { klog.Infof("ReportIntervalSeconds is %d, sync node metric to apiserver is disabled", r.config.ReportIntervalSeconds) @@ -153,6 +151,33 @@ func (r *reporter) Run(stopCh <-chan struct{}) error { return nil } +func (r *reporter) syncNodeMetricWorker(stopCh <-chan struct{}) { + reportInterval := r.getNodeMetricReportInterval() + for { + select { + case <-stopCh: + return + case <-time.After(reportInterval): + r.sync() + reportInterval = r.getNodeMetricReportInterval() + } + } +} + +func (r *reporter) getNodeMetricReportInterval() time.Duration { + reportInterval := time.Duration(r.config.ReportIntervalSeconds) * time.Second + nodeMetric, err := r.nodeMetricLister.Get(r.nodeName) + if err == nil && + nodeMetric.Spec.CollectPolicy != nil && + nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds != nil { + interval := *nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds + if interval > 0 { + reportInterval = time.Duration(interval) * time.Second + } + } + return reportInterval +} + func (r *reporter) sync() { if !r.isNodeMetricInited() { klog.Warningf("node metric has not initialized, skip this round.") diff --git a/pkg/koordlet/reporter/reporter_test.go b/pkg/koordlet/reporter/reporter_test.go index 25bcae3ba..47643abbf 100644 --- a/pkg/koordlet/reporter/reporter_test.go +++ b/pkg/koordlet/reporter/reporter_test.go @@ -18,10 +18,29 @@ package reporter import ( "testing" + "time" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/utils/pointer" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + listerbeta1 "github.com/koordinator-sh/koordinator/pkg/client/listers/slo/v1alpha1" ) +var _ listerbeta1.NodeMetricLister = &fakeNodeMetricLister{} + +type fakeNodeMetricLister struct { + nodeMetrics *slov1alpha1.NodeMetric +} + +func (f *fakeNodeMetricLister) List(selector labels.Selector) (ret []*slov1alpha1.NodeMetric, err error) { + return []*slov1alpha1.NodeMetric{f.nodeMetrics}, nil +} + +func (f *fakeNodeMetricLister) Get(name string) (*slov1alpha1.NodeMetric, error) { + return f.nodeMetrics, nil +} + func Test_reporter_isNodeMetricInited(t *testing.T) { type fields struct { nodeMetric *slov1alpha1.NodeMetric @@ -50,3 +69,24 @@ func Test_reporter_isNodeMetricInited(t *testing.T) { }) } } + +func Test_getNodeMetricReportInterval(t *testing.T) { + nodeMetricLister := &fakeNodeMetricLister{ + nodeMetrics: &slov1alpha1.NodeMetric{ + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(666), + }, + }, + }, + } + r := &reporter{ + config: NewDefaultConfig(), + nodeMetricLister: nodeMetricLister, + } + reportInterval := r.getNodeMetricReportInterval() + expectReportInterval := 666 * time.Second + if reportInterval != expectReportInterval { + t.Errorf("expect reportInterval %d but got %d", expectReportInterval, reportInterval) + } +} diff --git a/pkg/slo-controller/config/config.go b/pkg/slo-controller/config/config.go index 4c67e80b7..fc523c26a 100644 --- a/pkg/slo-controller/config/config.go +++ b/pkg/slo-controller/config/config.go @@ -73,12 +73,14 @@ type NodeResourceQoSStrategy struct { } type ColocationStrategy struct { - Enable *bool `json:"enable,omitempty"` - CPUReclaimThresholdPercent *int64 `json:"cpuReclaimThresholdPercent,omitempty"` - MemoryReclaimThresholdPercent *int64 `json:"memoryReclaimThresholdPercent,omitempty"` - DegradeTimeMinutes *int64 `json:"degradeTimeMinutes,omitempty"` - UpdateTimeThresholdSeconds *int64 `json:"updateTimeThresholdSeconds,omitempty"` - ResourceDiffThreshold *float64 `json:"resourceDiffThreshold,omitempty"` + Enable *bool `json:"enable,omitempty"` + MetricAggregateDurationSeconds *int64 `json:"metricAggregateDurationSeconds,omitempty"` + MetricReportIntervalSeconds *int64 `json:"metricReportIntervalSeconds,omitempty"` + CPUReclaimThresholdPercent *int64 `json:"cpuReclaimThresholdPercent,omitempty"` + MemoryReclaimThresholdPercent *int64 `json:"memoryReclaimThresholdPercent,omitempty"` + DegradeTimeMinutes *int64 `json:"degradeTimeMinutes,omitempty"` + UpdateTimeThresholdSeconds *int64 `json:"updateTimeThresholdSeconds,omitempty"` + ResourceDiffThreshold *float64 `json:"resourceDiffThreshold,omitempty"` } func NewDefaultColocationCfg() *ColocationCfg { @@ -94,17 +96,21 @@ func DefaultColocationCfg() ColocationCfg { func DefaultColocationStrategy() ColocationStrategy { return ColocationStrategy{ - Enable: pointer.Bool(false), - CPUReclaimThresholdPercent: pointer.Int64(60), - MemoryReclaimThresholdPercent: pointer.Int64(65), - DegradeTimeMinutes: pointer.Int64(15), - UpdateTimeThresholdSeconds: pointer.Int64(300), - ResourceDiffThreshold: pointer.Float64(0.1), + Enable: pointer.Bool(false), + MetricAggregateDurationSeconds: pointer.Int64(30), + MetricReportIntervalSeconds: pointer.Int64(60), + CPUReclaimThresholdPercent: pointer.Int64(60), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), } } func IsColocationStrategyValid(strategy *ColocationStrategy) bool { return strategy != nil && + (strategy.MetricAggregateDurationSeconds == nil || *strategy.MetricReportIntervalSeconds > 0) && + (strategy.MetricReportIntervalSeconds == nil || *strategy.MetricReportIntervalSeconds > 0) && (strategy.CPUReclaimThresholdPercent == nil || *strategy.CPUReclaimThresholdPercent > 0) && (strategy.MemoryReclaimThresholdPercent == nil || *strategy.MemoryReclaimThresholdPercent > 0) && (strategy.DegradeTimeMinutes == nil || *strategy.DegradeTimeMinutes > 0) && diff --git a/pkg/slo-controller/config/config_test.go b/pkg/slo-controller/config/config_test.go index 6aaadd132..d2f02b31f 100644 --- a/pkg/slo-controller/config/config_test.go +++ b/pkg/slo-controller/config/config_test.go @@ -170,12 +170,14 @@ func Test_GetNodeColocationStrategy(t *testing.T) { }, }, want: &ColocationStrategy{ - Enable: pointer.BoolPtr(true), - CPUReclaimThresholdPercent: pointer.Int64Ptr(60), - MemoryReclaimThresholdPercent: pointer.Int64Ptr(65), - DegradeTimeMinutes: pointer.Int64Ptr(15), - UpdateTimeThresholdSeconds: pointer.Int64Ptr(300), - ResourceDiffThreshold: pointer.Float64Ptr(0.1), + Enable: pointer.BoolPtr(true), + MetricAggregateDurationSeconds: pointer.Int64Ptr(30), + MetricReportIntervalSeconds: pointer.Int64Ptr(60), + CPUReclaimThresholdPercent: pointer.Int64Ptr(60), + MemoryReclaimThresholdPercent: pointer.Int64Ptr(65), + DegradeTimeMinutes: pointer.Int64Ptr(15), + UpdateTimeThresholdSeconds: pointer.Int64Ptr(300), + ResourceDiffThreshold: pointer.Float64Ptr(0.1), }, }, { diff --git a/pkg/slo-controller/nodemetric/collect_policy.go b/pkg/slo-controller/nodemetric/collect_policy.go new file mode 100644 index 000000000..ec3aaf2ab --- /dev/null +++ b/pkg/slo-controller/nodemetric/collect_policy.go @@ -0,0 +1,54 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package nodemetric + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/config" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource" +) + +func getNodeMetricCollectPolicy(node *corev1.Node, colocationConfig *noderesource.Config) (*slov1alpha1.NodeMetricCollectPolicy, error) { + colocationConfig.RLock() + defer colocationConfig.RUnlock() + + strategy := config.GetNodeColocationStrategy(&colocationConfig.ColocationCfg, node) + if strategy == nil { + strategy = &colocationConfig.ColocationStrategy + } + if strategy == nil { + return nil, fmt.Errorf("failed to find satisfied strategy") + } + + if !config.IsColocationStrategyValid(strategy) { + return nil, fmt.Errorf("invalid colocationConfig") + } + + if strategy.Enable == nil || !*strategy.Enable { + return nil, fmt.Errorf("colocationConfig disabled") + } + + collectPolicy := &slov1alpha1.NodeMetricCollectPolicy{ + AggregateDurationSeconds: strategy.MetricAggregateDurationSeconds, + ReportIntervalSeconds: strategy.MetricReportIntervalSeconds, + } + return collectPolicy, nil +} diff --git a/pkg/slo-controller/nodemetric/collect_policy_test.go b/pkg/slo-controller/nodemetric/collect_policy_test.go new file mode 100644 index 000000000..882ca37a5 --- /dev/null +++ b/pkg/slo-controller/nodemetric/collect_policy_test.go @@ -0,0 +1,160 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package nodemetric + +import ( + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/config" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource" +) + +func Test_getNodeMetricCollectPolicy(t *testing.T) { + tests := []struct { + name string + node *corev1.Node + config *config.ColocationCfg + want *slov1alpha1.NodeMetricCollectPolicy + wantErr bool + }{ + { + name: "empty config", + node: &corev1.Node{}, + want: nil, + wantErr: true, + }, + { + name: "config disabled", + node: &corev1.Node{}, + config: &config.ColocationCfg{ + ColocationStrategy: config.ColocationStrategy{ + Enable: pointer.Bool(false), + }, + }, + want: nil, + wantErr: true, + }, + { + name: "cluster policy", + node: &corev1.Node{}, + config: &config.ColocationCfg{ + ColocationStrategy: config.ColocationStrategy{ + Enable: pointer.Bool(true), + MetricAggregateDurationSeconds: pointer.Int64(60), + MetricReportIntervalSeconds: pointer.Int64(180), + }, + }, + want: &slov1alpha1.NodeMetricCollectPolicy{ + AggregateDurationSeconds: pointer.Int64(60), + ReportIntervalSeconds: pointer.Int64(180), + }, + }, + { + name: "cluster policy and node policy but use cluster", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "test": "true", + }, + }, + }, + config: &config.ColocationCfg{ + ColocationStrategy: config.ColocationStrategy{ + Enable: pointer.Bool(true), + + MetricAggregateDurationSeconds: pointer.Int64(30), + MetricReportIntervalSeconds: pointer.Int64(180), + }, + NodeConfigs: []config.NodeColocationCfg{ + { + NodeSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test": "false", + }, + }, + ColocationStrategy: config.ColocationStrategy{ + Enable: pointer.Bool(true), + MetricAggregateDurationSeconds: pointer.Int64(15), + MetricReportIntervalSeconds: pointer.Int64(30), + }, + }, + }, + }, + want: &slov1alpha1.NodeMetricCollectPolicy{ + AggregateDurationSeconds: pointer.Int64(30), + ReportIntervalSeconds: pointer.Int64(180), + }, + }, + { + name: "node policy", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "test": "false", + }, + }, + }, + config: &config.ColocationCfg{ + ColocationStrategy: config.ColocationStrategy{ + Enable: pointer.Bool(true), + MetricAggregateDurationSeconds: pointer.Int64(30), + MetricReportIntervalSeconds: pointer.Int64(180), + }, + NodeConfigs: []config.NodeColocationCfg{ + { + NodeSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test": "false", + }, + }, + ColocationStrategy: config.ColocationStrategy{ + Enable: pointer.Bool(true), + MetricAggregateDurationSeconds: pointer.Int64(15), + MetricReportIntervalSeconds: pointer.Int64(30), + }, + }, + }, + }, + want: &slov1alpha1.NodeMetricCollectPolicy{ + AggregateDurationSeconds: pointer.Int64(15), + ReportIntervalSeconds: pointer.Int64(30), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var resourceConfig noderesource.Config + if tt.config != nil { + resourceConfig.ColocationCfg = *tt.config + } + got, err := getNodeMetricCollectPolicy(tt.node, &resourceConfig) + if (err != nil) != tt.wantErr { + t.Errorf("getNodeMetricCollectPolicy() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getNodeMetricCollectPolicy() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/slo-controller/nodemetric/nodemetric.go b/pkg/slo-controller/nodemetric/nodemetric.go index ffdef94ee..1cd1bd1c3 100644 --- a/pkg/slo-controller/nodemetric/nodemetric.go +++ b/pkg/slo-controller/nodemetric/nodemetric.go @@ -31,7 +31,10 @@ func (r *NodeMetricReconciler) initNodeMetric(node *corev1.Node, nodeMetric *slo return fmt.Errorf("both Node and NodeMetric should not be empty") } - nodeMetricSpec := &slov1alpha1.NodeMetricSpec{} + nodeMetricSpec, err := r.getNodeMetricSpec(node, nil) + if err != nil { + return err + } nodeMetricStatus := &slov1alpha1.NodeMetricStatus{ UpdateTime: &metav1.Time{Time: time.Now()}, diff --git a/pkg/slo-controller/nodemetric/nodemetric_controller.go b/pkg/slo-controller/nodemetric/nodemetric_controller.go index abbd253d7..7691f56f2 100644 --- a/pkg/slo-controller/nodemetric/nodemetric_controller.go +++ b/pkg/slo-controller/nodemetric/nodemetric_controller.go @@ -18,23 +18,31 @@ package nodemetric import ( "context" + "fmt" + "reflect" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/config" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource" ) // NodeMetricReconciler reconciles a NodeMetric object type NodeMetricReconciler struct { client.Client Scheme *runtime.Scheme + // TODO: Separate Config from noderesource package + config noderesource.Config } // +kubebuilder:rbac:groups=slo.koordinator.sh,resources=nodemetrics,verbs=get;list;watch;create;update;patch;delete @@ -89,15 +97,82 @@ func (r *NodeMetricReconciler) Reconcile(ctx context.Context, req ctrl.Request) klog.Errorf("failed to create nodeMetric %v, error: %v", nodeMetricName, err) return ctrl.Result{Requeue: true}, err } + } else { + // update nodeMetric spec if both exists + nodeMetricSpec, err := r.getNodeMetricSpec(node, &nodeMetric.Spec) + if err != nil { + klog.Errorf("syncNodeMetric failed to get nodeMetric spec: %v", err) + return reconcile.Result{Requeue: true}, err + } + if !reflect.DeepEqual(nodeMetricSpec, &nodeMetric.Spec) { + nodeMetric.Spec = *nodeMetricSpec + err = r.Client.Update(context.TODO(), nodeMetric) + if err != nil { + klog.Errorf("syncNodeMetric failed to update nodeMetric %v, error: %v", nodeMetricName, err) + return reconcile.Result{Requeue: true}, err + } + } } return ctrl.Result{}, nil } +func (r *NodeMetricReconciler) getNodeMetricSpec(node *corev1.Node, oldSpec *slov1alpha1.NodeMetricSpec) (*slov1alpha1.NodeMetricSpec, error) { + // get cr's spec from the configmap + // if the configmap does not exist, use the default + if r.Client == nil { + klog.Errorf("getNodeMetricSpec failed to load configmap %s/%s", + config.ConfigNameSpace, config.SLOCtrlConfigMap) + return nil, fmt.Errorf("no available client") + } + + var nodeMetricSpec *slov1alpha1.NodeMetricSpec + if oldSpec != nil { + nodeMetricSpec = oldSpec.DeepCopy() + } else { + defaultColocationCfg := config.NewDefaultColocationCfg() + nodeMetricSpec = &slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + AggregateDurationSeconds: defaultColocationCfg.MetricAggregateDurationSeconds, + ReportIntervalSeconds: defaultColocationCfg.MetricReportIntervalSeconds, + }, + } + } + + // TODO: record an event about the failure reason on configmap/crd when failed to load the config + configMap := &corev1.ConfigMap{} + keyTypes := types.NamespacedName{Namespace: config.ConfigNameSpace, Name: config.SLOCtrlConfigMap} + if err := r.Client.Get(context.TODO(), keyTypes, configMap); err != nil { + // default when the configmap does not exist + if errors.IsNotFound(err) { + klog.Infof("getNodeMetricSpec(): config map %s/%s not exist, err:%s", config.ConfigNameSpace, + config.SLOCtrlConfigMap, err) + return nodeMetricSpec, nil + } + // abort spec update if cannot get configmap + klog.Errorf("getNodeMetricSpec(): failed to load config map %s/%s, err:%s", config.ConfigNameSpace, + config.SLOCtrlConfigMap, err) + return nil, err + } + + nodeMetricCollectPolicy, err := getNodeMetricCollectPolicy(node, &r.config) + if err != nil { + klog.Warningf("getNodeMetricSpec(): failed to get nodeMetricCollectPolicy for node %s, set the default error: %v", node.Name, err) + } else { + nodeMetricSpec.CollectPolicy = nodeMetricCollectPolicy + } + + return nodeMetricSpec, nil +} + // SetupWithManager sets up the controller with the Manager. func (r *NodeMetricReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&slov1alpha1.NodeMetric{}). Watches(&source.Kind{Type: &corev1.Node{}}, &EnqueueRequestForNode{}). + Watches(&source.Kind{Type: &corev1.ConfigMap{}}, &noderesource.EnqueueRequestForConfigMap{ + Config: &r.config, + Client: r.Client, + }). Complete(r) } diff --git a/pkg/slo-controller/nodemetric/nodemetric_controller_test.go b/pkg/slo-controller/nodemetric/nodemetric_controller_test.go index 62fcd5766..3adfd7f69 100644 --- a/pkg/slo-controller/nodemetric/nodemetric_controller_test.go +++ b/pkg/slo-controller/nodemetric/nodemetric_controller_test.go @@ -18,6 +18,8 @@ package nodemetric import ( "context" + "encoding/json" + "fmt" "time" . "github.com/onsi/ginkgo" @@ -27,8 +29,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + "k8s.io/utils/pointer" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/config" ) var _ = Describe("NodeMetric Controller", func() { @@ -85,6 +89,85 @@ var _ = Describe("NodeMetric Controller", func() { }, timeout, interval).Should(BeTrue()) }) }) + + Context("Update NodeMetric through ConfigMap", func() { + It("Should update NodeMetricCollectPolicy", func() { + ctx := context.Background() + node := newNodeForTest(nodeName) + + key := types.NamespacedName{Name: node.Name} + createdNodeMetric := &slov1alpha1.NodeMetric{} + + By(fmt.Sprintf("create node %s", key)) + Expect(k8sClient.Create(ctx, node)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, key, createdNodeMetric) + if err != nil { + klog.Errorf("failed to get the corresponding NodeMetric, error: %v", err) + } + return err == nil + }, timeout, interval).Should(BeTrue()) + + ns := &corev1.Namespace{} + err := k8sClient.Get(ctx, types.NamespacedName{Name: config.ConfigNameSpace}, ns) + if errors.IsNotFound(err) { + By(fmt.Sprintf("create config namespace %s", config.ConfigNameSpace)) + ns.Name = config.ConfigNameSpace + Expect(k8sClient.Create(ctx, ns)).Should(Succeed()) + } else { + Fail(fmt.Sprintf("Failed to get namespace %s, err: %v", config.ConfigNameSpace, err)) + } + + By("create slo-controller-config with nodeMetricCollectPolicyCfg") + policyConfig := &config.ColocationCfg{ + ColocationStrategy: config.ColocationStrategy{ + Enable: pointer.Bool(true), + MetricAggregateDurationSeconds: pointer.Int64(60), + MetricReportIntervalSeconds: pointer.Int64(180), + }, + } + data, err := json.Marshal(policyConfig) + Expect(err).Should(Succeed()) + + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: config.ConfigNameSpace, + Name: config.SLOCtrlConfigMap, + }, + Data: map[string]string{ + config.ColocationConfigKey: string(data), + }, + } + Expect(k8sClient.Create(ctx, configMap)).Should(Succeed()) + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: config.SLOCtrlConfigMap, Namespace: config.ConfigNameSpace}, configMap)).Should(Succeed()) + + By("update nodeMetric status to trigger update nodeMetricSpec") + Eventually(func() bool { + key := types.NamespacedName{Name: node.Name} + nodeMetric := &slov1alpha1.NodeMetric{} + err := k8sClient.Get(ctx, key, nodeMetric) + if err != nil { + klog.Errorf("failed to get the corresponding NodeMetric, error: %v", err) + return false + } + if nodeMetric.Spec.CollectPolicy != nil && + nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds != nil && + *nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds == 180 { + return true + } + + nodeMetric.Status.UpdateTime = &metav1.Time{ + Time: time.Now(), + } + err = k8sClient.Update(ctx, nodeMetric) + if err != nil { + klog.Errorf("failed to update nodeMetric, err: %v", err) + } + return false + }, timeout, interval).Should(BeTrue()) + }) + }) }) func newNodeForTest(name string) *corev1.Node { diff --git a/pkg/slo-controller/nodemetric/suite_test.go b/pkg/slo-controller/nodemetric/suite_test.go index fc0c41a8c..7dbc51a66 100644 --- a/pkg/slo-controller/nodemetric/suite_test.go +++ b/pkg/slo-controller/nodemetric/suite_test.go @@ -83,7 +83,7 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) - r := &NodeMetricReconciler{mgr.GetClient(), mgr.GetScheme()} + r := &NodeMetricReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()} Expect(r.SetupWithManager(mgr)).ToNot(HaveOccurred())