From 51a03047bf39ad45a16e7e3d393242b5a79229fc Mon Sep 17 00:00:00 2001 From: zhy76 <958474674@qq.com> Date: Tue, 3 Oct 2023 13:06:08 +0000 Subject: [PATCH] feat: monitoring lantency of reported information in KCNR Signed-off-by: zhy76 <958474674@qq.com> --- cmd/katalyst-controller/app/controller/cnr.go | 2 +- pkg/controller/cnr/cnr.go | 98 ++++++--- pkg/controller/cnr/cnr_indicator.go | 51 ++++- pkg/controller/cnr/cnr_indicator_test.go | 83 +++++++- pkg/controller/cnr/cnr_test.go | 187 ++++++++++++++++++ 5 files changed, 385 insertions(+), 36 deletions(-) create mode 100644 pkg/controller/cnr/cnr_test.go diff --git a/cmd/katalyst-controller/app/controller/cnr.go b/cmd/katalyst-controller/app/controller/cnr.go index 1bb338fc0b..deb1dfb5d5 100644 --- a/cmd/katalyst-controller/app/controller/cnr.go +++ b/cmd/katalyst-controller/app/controller/cnr.go @@ -42,7 +42,7 @@ func StartCNRController(ctx context.Context, controlCtx *katalystbase.GenericCon conf.GenericConfiguration, conf.GenericControllerConfiguration, controlCtx.Client, - // controlCtx.KubeInformerFactory.Core().V1().Pods(), + controlCtx.KubeInformerFactory.Core().V1().Pods(), controlCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), controlCtx.EmitterPool.GetDefaultMetricsEmitter(), ) diff --git a/pkg/controller/cnr/cnr.go b/pkg/controller/cnr/cnr.go index b2d46fd77b..3ae1ae8083 100644 --- a/pkg/controller/cnr/cnr.go +++ b/pkg/controller/cnr/cnr.go @@ -21,8 +21,11 @@ import ( "fmt" "time" + corev1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/cri-api/pkg/errors" @@ -51,16 +54,17 @@ type CNRController struct { cnrListerSynced cache.InformerSynced cnrLister listers.CustomNodeResourceLister - // podListerSynced cache.InformerSynced - // podLister corelisters.PodLister + podListerSynced cache.InformerSynced + podLister corelisters.PodLister // queue for cnr cnrSyncQueue workqueue.RateLimitingInterface - // // queue for pod - // podSyncQueue workqueue.RateLimitingInterface // metricsEmitter for emit metrics metricsEmitter metrics.MetricEmitter + + // podTimeMap for record pod scheduled time + podTimeMap map[string]time.Time } // NewCNRController create a new CNRController @@ -69,7 +73,7 @@ func NewCNRController( genericConf *generic.GenericConfiguration, _ *controller.GenericControllerConfiguration, client *client.GenericClientSet, - // podInformer coreinformers.PodInformer, + podInformer coreinformers.PodInformer, cnrInformer informers.CustomNodeResourceInformer, metricsEmitter metrics.MetricEmitter) (*CNRController, error) { @@ -77,7 +81,7 @@ func NewCNRController( ctx: ctx, client: client, cnrSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cnrControllerName), - // podSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod"), + podTimeMap: make(map[string]time.Time), } // init cnr informer @@ -91,14 +95,13 @@ func NewCNRController( cnrController.cnrListerSynced = cnrInformer.Informer().HasSynced // init pod informer - // podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - // AddFunc: cnrController.addPodEventHandler, - // UpdateFunc: cnrController.updatePodEventHandler, - // }) - // // init pod lister - // cnrController.podLister = podInformer.Lister() - // // init pod synced - // cnrController.podListerSynced = podInformer.Informer().HasSynced + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: cnrController.updatePodEventHandler, + }) + // init pod lister + cnrController.podLister = podInformer.Lister() + // init pod synced + cnrController.podListerSynced = podInformer.Informer().HasSynced if metricsEmitter == nil { // if metricsEmitter is nil, use dummy metrics @@ -121,14 +124,14 @@ func NewCNRController( func (ctrl *CNRController) Run() { defer utilruntime.HandleCrash() defer ctrl.cnrSyncQueue.ShutDown() - // defer ctrl.podSyncQueue.ShutDown() defer klog.Infof("Shutting down %s controller", cnrControllerName) - // wait for cache sync - if !cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced) { + // wait for cnr cache sync + if !cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) { utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", cnrControllerName)) return } + klog.Infof("Caches are synced for %s controller", cnrControllerName) klog.Infof("start %d workers for %s controller", cnrWorkerCount, cnrControllerName) @@ -136,6 +139,10 @@ func (ctrl *CNRController) Run() { go wait.Until(ctrl.cnrWorker, time.Second, ctrl.ctx.Done()) } + // gc podTimeMap + klog.Infof("start gc podTimeMap...") + go wait.Until(ctrl.gcPodTimeMap, 1*time.Minute, ctrl.ctx.Done()) + <-ctrl.ctx.Done() } @@ -216,6 +223,11 @@ func (ctrl *CNRController) syncCNR(key string) error { // enqueueCNR enqueues the given CNR in the work queue. func (ctrl *CNRController) enqueueCNR(cnr *apis.CustomNodeResource) { + if cnr == nil { + klog.Warning("trying to enqueue a nil cnr") + return + } + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(cnr) if err != nil { utilruntime.HandleError(fmt.Errorf("Cound't get key for CNR %+v: %v", cnr, err)) @@ -228,7 +240,7 @@ func (ctrl *CNRController) addCNREventHandler(obj interface{}) { klog.Infof("CNR create event found...") cnr, ok := obj.(*apis.CustomNodeResource) if !ok { - klog.Errorf("cnanot convert obj to *apis.CNR: %v", obj) + klog.Errorf("cannot convert obj to *apis.CNR") return } klog.V(4).Infof("notice addition of cnr %s", cnr.Name) @@ -238,20 +250,50 @@ func (ctrl *CNRController) addCNREventHandler(obj interface{}) { func (ctrl *CNRController) updateCNREventHandler(_, newObj interface{}) { klog.Infof("CNR update event found...") - c, ok := newObj.(*apis.CustomNodeResource) + cnr, ok := newObj.(*apis.CustomNodeResource) if !ok { - klog.Errorf("cannot convert newObj to *apis.CNR: %v", c) + klog.Errorf("cannot convert newObj to *apis.CNR") return } - klog.V(4).Infof("notice addition of cnr %s", c.Name) + klog.V(4).Infof("notice update of cnr %s", cnr.Name) - ctrl.enqueueCNR(c) + // emit cnr pod report lantency metric + klog.Infof("Emit CNR Report Lantency metric...") + err := ctrl.emitCNRReportLantencyMetric(cnr) + if err != nil { + klog.Errorf("emit cnr report lantency metric failed: %v", err) + } + + ctrl.enqueueCNR(cnr) } -// func (ctrl *CNRController) addPodEventHandler(obj interface{}) { -// ctrl.podSyncQueue.Add(obj) -// } +func (ctrl *CNRController) updatePodEventHandler(oldObj, newObj interface{}) { + klog.Infof("Pod update event found...") + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + klog.Errorf("cannot convert oldObj to Pod") + return + } + newPod, ok := newObj.(*corev1.Pod) + if !ok { + klog.Errorf("cannot convert newObj to Pod") + return + } + if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" { + klog.Infof("notice pod: %v scheduled to node: %v", newPod.Name, newPod.Spec.NodeName) + // record pod scheduled time + ctrl.podTimeMap[string(newPod.UID)] = time.Now() + } +} -// func (ctrl *CNRController) updatePodEventHandler(oldObj, newObj interface{}) { -// ctrl.podSyncQueue.Add(newObj) -// } +// gcPodTimeMap gc podTimeMap which over 1 minutes not used +func (ctrl *CNRController) gcPodTimeMap() { + klog.Infof("gc podTimeMap...") + for key, value := range ctrl.podTimeMap { + notUsedTime := time.Now().Sub(value) + if notUsedTime > time.Minute { + klog.Infof("gc podTimeMap: %v, which not used over %v minutes", key, notUsedTime.Minutes()) + delete(ctrl.podTimeMap, key) + } + } +} diff --git a/pkg/controller/cnr/cnr_indicator.go b/pkg/controller/cnr/cnr_indicator.go index 58d47fda39..84bb05220d 100644 --- a/pkg/controller/cnr/cnr_indicator.go +++ b/pkg/controller/cnr/cnr_indicator.go @@ -17,6 +17,8 @@ limitations under the License. package cnr import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" @@ -28,8 +30,8 @@ import ( ) const ( - metricsNameCNRReportAnomaly = "cnr_report_anomaly" - // metricsNameCNRReportLantency = "cnr_report_latency" + metricsNameCNRReportAnomaly = "cnr_report_anomaly" + metricsNameCNRReportLantency = "cnr_report_latency" ) const ( @@ -136,10 +138,10 @@ func (ctrl *CNRController) emitCNRAnomalyMetric(cnr *v1alpha1.CustomNodeResource _ = ctrl.metricsEmitter.StoreInt64(metricsNameCNRReportAnomaly, 1, metrics.MetricTypeNameRaw, metrics.MetricTag{ - Key: "nodeIp", Val: node.Status.Addresses[0].Address, + Key: "node_ip", Val: node.Status.Addresses[0].Address, }, metrics.MetricTag{ - Key: "nodeName", Val: cnr.Name, + Key: "node_name", Val: cnr.Name, }, metrics.MetricTag{ Key: "reason", Val: reason, @@ -148,3 +150,44 @@ func (ctrl *CNRController) emitCNRAnomalyMetric(cnr *v1alpha1.CustomNodeResource return nil } + +// emitCNRReportLantencyMetric emit CNR report lantency metric +func (ctrl *CNRController) emitCNRReportLantencyMetric(cnr *v1alpha1.CustomNodeResource) error { + for _, socket := range cnr.Status.TopologyZone { + for _, numa := range socket.Children { + if numa.Type != v1alpha1.TopologyTypeNuma { + // only check numa + continue + } + for _, allocation := range numa.Allocations { + key := allocation.Consumer + namespace, podName, uid, err := native.ParseUniqObjectUIDKey(key) + if err != nil { + klog.Errorf("[CNRReportLantency] failed to parse uniq object uid key %s", key) + } + if _, ok := ctrl.podTimeMap[uid]; !ok { + continue + } + if ctrl.podTimeMap[uid].IsZero() { + continue + } + klog.Infof("[CNRReportLantency] pod %s/%s/%s report lantency: %dms", namespace, podName, uid, time.Since(ctrl.podTimeMap[uid]).Milliseconds()) + _ = ctrl.metricsEmitter.StoreFloat64(metricsNameCNRReportLantency, float64(time.Since(ctrl.podTimeMap[uid]).Milliseconds()), + metrics.MetricTypeNameRaw, + metrics.MetricTag{ + Key: "namespace", Val: namespace, + }, + metrics.MetricTag{ + Key: "pod_name", Val: podName, + }, + metrics.MetricTag{ + Key: "pod_uid", Val: uid, + }, + ) + // delete the used data from podTimeMap + delete(ctrl.podTimeMap, uid) + } + } + } + return nil +} diff --git a/pkg/controller/cnr/cnr_indicator_test.go b/pkg/controller/cnr/cnr_indicator_test.go index 81dcd11128..1731f824db 100644 --- a/pkg/controller/cnr/cnr_indicator_test.go +++ b/pkg/controller/cnr/cnr_indicator_test.go @@ -19,6 +19,7 @@ package cnr import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -307,7 +308,7 @@ func Test_checkNumaExclusiveAnomaly(t *testing.T) { conf.GenericConfiguration, conf.GenericControllerConfiguration, genericCtx.Client, - // genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), ) @@ -483,7 +484,7 @@ func Test_checkNumaAllocatableSumAnomaly(t *testing.T) { conf.GenericConfiguration, conf.GenericControllerConfiguration, genericCtx.Client, - // genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), ) @@ -620,7 +621,7 @@ func Test_checkPodAllocationSumAnomaly(t *testing.T) { conf.GenericConfiguration, conf.GenericControllerConfiguration, genericCtx.Client, - // genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), ) @@ -631,3 +632,79 @@ func Test_checkPodAllocationSumAnomaly(t *testing.T) { }) } } + +func Test_emitCNRReportLantencyMetric(t *testing.T) { + t.Parallel() + + cnr := &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/1111111111", + Requests: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + }, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "0", + }, + }, + }, + }, + }, + } + + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}, []runtime.Object{cnr}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + ctrl.podTimeMap = map[string]time.Time{ + "1111111111": time.Now(), + } + time.Sleep(2 * time.Millisecond) + assert.NoError(t, err) + + err = ctrl.emitCNRReportLantencyMetric(cnr) + assert.NoError(t, err) + if _, ok := ctrl.podTimeMap["1111111111"]; ok { + t.Errorf("podTimeMap should not have key 1111111111") + } +} diff --git a/pkg/controller/cnr/cnr_test.go b/pkg/controller/cnr/cnr_test.go new file mode 100644 index 0000000000..a30561d4ff --- /dev/null +++ b/pkg/controller/cnr/cnr_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cnr + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" +) + +func TestCNR_Run(t *testing.T) { + t.Parallel() + + var ( + oldPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "uid1", + }, + Spec: corev1.PodSpec{ + NodeName: "", + }, + } + ) + + type fields struct { + pod *corev1.Pod + cnr *v1alpha1.CustomNodeResource + } + tests := []struct { + name string + fields fields + }{ + { + name: "test-pod-nodename-updated", + fields: fields{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "uid1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{ + "test": "test", + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{oldPod}, []runtime.Object{tt.fields.cnr}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + assert.NoError(t, err) + + // test cache synced + genericCtx.KubeInformerFactory.Start(ctrl.ctx.Done()) + genericCtx.InternalInformerFactory.Start(ctrl.ctx.Done()) + go ctrl.Run() + + cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) + time.Sleep(100 * time.Millisecond) + + gotCNR, err := ctrl.cnrLister.Get(tt.fields.cnr.Name) + assert.NoError(t, err) + assert.Equal(t, tt.fields.cnr, gotCNR) + + gotPod, err := ctrl.podLister.Pods(oldPod.Namespace).Get(oldPod.Name) + assert.NoError(t, err) + assert.Equal(t, oldPod, gotPod) + + // test schedule pod to node + _, err = ctrl.client.KubeClient.CoreV1().Pods(tt.fields.pod.Namespace).Update(ctrl.ctx, tt.fields.pod, metav1.UpdateOptions{}) + assert.NoError(t, err) + time.Sleep(100 * time.Millisecond) + ctrl.podTimeMap[string(tt.fields.pod.UID)] = time.Now() + }) + } +} + +func Test_gcPodTimeMap(t *testing.T) { + t.Parallel() + + var ( + time1 = time.Now().Add(-time.Hour) + time2 = time.Now().Add(-2 * time.Minute) + time3 = time.Now().Add(-time.Second) + ) + type args struct { + podTimeMap map[string]time.Time + } + tests := []struct { + name string + args args + want map[string]time.Time + }{ + { + name: "test-gc", + args: args{ + podTimeMap: map[string]time.Time{ + "pod1": time1, + "pod2": time2, + "pod3": time3, + }, + }, + want: map[string]time.Time{ + "pod3": time3, + }, + }, + { + name: "test-no-gc", + args: args{ + podTimeMap: map[string]time.Time{ + "pod1": time3, + }, + }, + want: map[string]time.Time{ + "pod1": time3, + }, + }, + { + name: "test-empty", + args: args{ + podTimeMap: map[string]time.Time{}, + }, + want: map[string]time.Time{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := &CNRController{ + podTimeMap: tt.args.podTimeMap, + } + ctrl.gcPodTimeMap() + assert.Equal(t, tt.want, ctrl.podTimeMap) + }) + } +}