diff --git a/cmd/katalyst-controller/app/controller/monitor.go b/cmd/katalyst-controller/app/controller/monitor.go new file mode 100644 index 000000000..a36c0a50c --- /dev/null +++ b/cmd/katalyst-controller/app/controller/monitor.go @@ -0,0 +1,62 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + "k8s.io/klog/v2" + + katalystbase "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/controller/monitor" +) + +const ( + MonitorControllerName = "monitor" +) + +func StartMonitorController(ctx context.Context, controlCtx *katalystbase.GenericContext, + conf *config.Configuration, _ interface{}, _ string) (bool, error) { + var ( + cnrMonitorController *monitor.CNRMonitorController + err error + ) + + if conf.MonitorConfig.EnableCNRMonitor { + cnrMonitorController, err = monitor.NewCNRMonitorController(ctx, + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.CNRMonitorConfig, + controlCtx.Client, + controlCtx.KubeInformerFactory.Core().V1().Nodes(), + controlCtx.KubeInformerFactory.Core().V1().Pods(), + controlCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + controlCtx.EmitterPool.GetDefaultMetricsEmitter(), + ) + if err != nil { + klog.Errorf("failed to new CNR monitor controller") + return false, err + } + } + + if cnrMonitorController != nil { + go cnrMonitorController.Run() + } + + return true, nil +} diff --git a/cmd/katalyst-controller/app/enablecontrollers.go b/cmd/katalyst-controller/app/enablecontrollers.go index dc9063449..0da0cc4ce 100644 --- a/cmd/katalyst-controller/app/enablecontrollers.go +++ b/cmd/katalyst-controller/app/enablecontrollers.go @@ -51,6 +51,7 @@ func init() { controllerInitializers.Store(controller.KCCControllerName, ControllerStarter{Starter: controller.StartKCCController}) controllerInitializers.Store(controller.SPDControllerName, ControllerStarter{Starter: controller.StartSPDController}) controllerInitializers.Store(controller.LifeCycleControllerName, ControllerStarter{Starter: controller.StartLifeCycleController}) + controllerInitializers.Store(controller.MonitorControllerName, ControllerStarter{Starter: controller.StartMonitorController}) } // RegisterControllerInitializer is used to register user-defined controllers diff --git a/cmd/katalyst-controller/app/options/controller.go b/cmd/katalyst-controller/app/options/controller.go index 0c21599e2..686e72739 100644 --- a/cmd/katalyst-controller/app/options/controller.go +++ b/cmd/katalyst-controller/app/options/controller.go @@ -28,6 +28,7 @@ type ControllersOptions struct { *KCCOptions *SPDOptions *LifeCycleOptions + *MonitorOptions } func NewControllersOptions() *ControllersOptions { @@ -36,6 +37,7 @@ func NewControllersOptions() *ControllersOptions { KCCOptions: NewKCCOptions(), SPDOptions: NewSPDOptions(), LifeCycleOptions: NewLifeCycleOptions(), + MonitorOptions: NewMonitorOptions(), } } @@ -44,6 +46,7 @@ func (o *ControllersOptions) AddFlags(fss *cliflag.NamedFlagSets) { o.KCCOptions.AddFlags(fss) o.SPDOptions.AddFlags(fss) o.LifeCycleOptions.AddFlags(fss) + o.MonitorOptions.AddFlags(fss) } // ApplyTo fills up config with options @@ -54,6 +57,7 @@ func (o *ControllersOptions) ApplyTo(c *controllerconfig.ControllersConfiguratio errList = append(errList, o.KCCOptions.ApplyTo(c.KCCConfig)) errList = append(errList, o.SPDOptions.ApplyTo(c.SPDConfig)) errList = append(errList, o.LifeCycleOptions.ApplyTo(c.LifeCycleConfig)) + errList = append(errList, o.MonitorOptions.ApplyTo(c.MonitorConfig)) return errors.NewAggregate(errList) } diff --git a/cmd/katalyst-controller/app/options/monitor.go b/cmd/katalyst-controller/app/options/monitor.go new file mode 100644 index 000000000..30ca5ebbe --- /dev/null +++ b/cmd/katalyst-controller/app/options/monitor.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + cliflag "k8s.io/component-base/cli/flag" + + "github.com/kubewharf/katalyst-core/pkg/config/controller" +) + +// MonitorOptions holds the configurations for Monitor. +type MonitorOptions struct { + // EnableCNRMonitor is a flag to enable CNR monitor controller + EnableCNRMonitor bool +} + +func NewMonitorOptions() *MonitorOptions { + return &MonitorOptions{ + EnableCNRMonitor: true, + } +} + +// AddFlags adds flags to the specified FlagSet. +func (o *MonitorOptions) AddFlags(fss *cliflag.NamedFlagSets) { + fs := fss.FlagSet("monitor") + + fs.BoolVar(&o.EnableCNRMonitor, "cnr-monitor-enable", o.EnableCNRMonitor, + "whether to enable the cnr controller") +} + +// ApplyTo fills up config with options +func (o *MonitorOptions) ApplyTo(c *controller.MonitorConfig) error { + c.EnableCNRMonitor = o.EnableCNRMonitor + return nil +} + +func (o *MonitorOptions) Config() (*controller.MonitorConfig, error) { + c := &controller.MonitorConfig{} + if err := o.ApplyTo(c); err != nil { + return nil, err + } + return c, nil +} diff --git a/examples/dedicated-normal-pod.yaml b/examples/dedicated-normal-pod.yaml new file mode 100644 index 000000000..6f44b5298 --- /dev/null +++ b/examples/dedicated-normal-pod.yaml @@ -0,0 +1,42 @@ +# Copyright 2022 The Katalyst Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Pod +metadata: + annotations: + "katalyst.kubewharf.io/qos_level": dedicated_cores + "katalyst.kubewharf.io/memory_enhancement": '{ + "numa_binding": "true", + "numa_exclusive": "true" + }' + name: dedicated-normal-pod + namespace: default +spec: + containers: + - name: stress + image: joedval/stress:latest + command: + - stress + - -c + - "1" + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: "1" + memory: 1Gi + limits: + cpu: "1" + memory: 1Gi + schedulerName: katalyst-scheduler \ No newline at end of file diff --git a/pkg/config/controller/controller_base.go b/pkg/config/controller/controller_base.go index 984508df5..0f236a6ab 100644 --- a/pkg/config/controller/controller_base.go +++ b/pkg/config/controller/controller_base.go @@ -46,6 +46,7 @@ type ControllersConfiguration struct { *KCCConfig *SPDConfig *LifeCycleConfig + *MonitorConfig } func NewGenericControllerConfiguration() *GenericControllerConfiguration { @@ -58,5 +59,6 @@ func NewControllersConfiguration() *ControllersConfiguration { KCCConfig: NewKCCConfig(), SPDConfig: NewSPDConfig(), LifeCycleConfig: NewLifeCycleConfig(), + MonitorConfig: NewMonitorConfig(), } } diff --git a/pkg/config/controller/monitor.go b/pkg/config/controller/monitor.go new file mode 100644 index 000000000..1b52a6c50 --- /dev/null +++ b/pkg/config/controller/monitor.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +type CNRMonitorConfig struct{} + +type MonitorConfig struct { + // EnableCNRMonitor is a flag to enable CNR monitor controller + EnableCNRMonitor bool + + *CNRMonitorConfig +} + +func NewMonitorConfig() *MonitorConfig { + return &MonitorConfig{ + EnableCNRMonitor: true, + CNRMonitorConfig: &CNRMonitorConfig{}, + } +} diff --git a/pkg/controller/lifecycle/cnr.go b/pkg/controller/lifecycle/cnr.go index 46a50331b..1e02740c0 100644 --- a/pkg/controller/lifecycle/cnr.go +++ b/pkg/controller/lifecycle/cnr.go @@ -184,7 +184,7 @@ func (cl *CNRLifecycle) addCNREventHandle(obj interface{}) { func (cl *CNRLifecycle) updateCNREventHandle(_, new interface{}) { c, ok := new.(*apis.CustomNodeResource) if !ok { - klog.Errorf("cannot convert oldObj to *apis.CNR: %v", c) + klog.Errorf("cannot convert newObj to *apis.CNR: %v", c) return } klog.V(4).Infof("notice addition of cnr %s", c.Name) diff --git a/pkg/controller/monitor/cnr.go b/pkg/controller/monitor/cnr.go new file mode 100644 index 000000000..2ebf9299b --- /dev/null +++ b/pkg/controller/monitor/cnr.go @@ -0,0 +1,336 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "context" + "fmt" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/cri-api/pkg/errors" + "k8s.io/klog/v2" + + apis "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + informers "github.com/kubewharf/katalyst-api/pkg/client/informers/externalversions/node/v1alpha1" + listers "github.com/kubewharf/katalyst-api/pkg/client/listers/node/v1alpha1" + "github.com/kubewharf/katalyst-core/pkg/client" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +const ( + cnrMonitorControllerName = "cnr-monitor" + cnrMonitorWorkerCount = 1 +) + +const ( + // maxToleranceLantency is the max tolerance lantency for cnr report lantency + maxToleranceLantency = 5 * time.Minute +) + +type CNRMonitorController struct { + ctx context.Context + + client *client.GenericClientSet + + cnrListerSynced cache.InformerSynced + cnrLister listers.CustomNodeResourceLister + nodeListerSynced cache.InformerSynced + nodeLister corelisters.NodeLister + podListerSynced cache.InformerSynced + podLister corelisters.PodLister + + // queue for cnr + cnrSyncQueue workqueue.RateLimitingInterface + + // metricsEmitter for emit metrics + metricsEmitter metrics.MetricEmitter + + // podTimeMap for record pod scheduled time + podTimeMap sync.Map +} + +// NewCNRMonitorController create a new CNRMonitorController +func NewCNRMonitorController( + ctx context.Context, + genericConf *generic.GenericConfiguration, + _ *controller.GenericControllerConfiguration, + _ *controller.CNRMonitorConfig, + client *client.GenericClientSet, + nodeInformer coreinformers.NodeInformer, + podInformer coreinformers.PodInformer, + cnrInformer informers.CustomNodeResourceInformer, + metricsEmitter metrics.MetricEmitter) (*CNRMonitorController, error) { + + cnrMonitorController := &CNRMonitorController{ + ctx: ctx, + client: client, + cnrSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cnrMonitorControllerName), + podTimeMap: sync.Map{}, + } + + // init cnr informer + cnrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cnrMonitorController.addCNREventHandler, + UpdateFunc: cnrMonitorController.updateCNREventHandler, + }) + // init cnr lister + cnrMonitorController.cnrLister = cnrInformer.Lister() + // init cnr synced + cnrMonitorController.cnrListerSynced = cnrInformer.Informer().HasSynced + + // init node lister + cnrMonitorController.nodeLister = nodeInformer.Lister() + // init node synced + cnrMonitorController.nodeListerSynced = nodeInformer.Informer().HasSynced + + // init pod informer + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: cnrMonitorController.updatePodEventHandler, + }) + // init pod lister + cnrMonitorController.podLister = podInformer.Lister() + // init pod synced + cnrMonitorController.podListerSynced = podInformer.Informer().HasSynced + + if metricsEmitter == nil { + // if metricsEmitter is nil, use dummy metrics + cnrMonitorController.metricsEmitter = metrics.DummyMetrics{} + } else { + // if metricsEmitter is not nil, use metricsEmitter with tags + cnrMonitorController.metricsEmitter = metricsEmitter.WithTags(cnrMonitorControllerName) + } + + return cnrMonitorController, nil +} + +func (ctrl *CNRMonitorController) Run() { + defer utilruntime.HandleCrash() + defer ctrl.cnrSyncQueue.ShutDown() + defer klog.Infof("Shutting down %s controller", cnrMonitorControllerName) + + // wait for cnr cache sync + if !cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.nodeListerSynced, ctrl.podListerSynced) { + utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", cnrMonitorControllerName)) + return + } + + klog.Infof("Caches are synced for %s controller", cnrMonitorControllerName) + klog.Infof("start %d workers for %s controller", cnrMonitorWorkerCount, cnrMonitorControllerName) + + for i := 0; i < cnrMonitorWorkerCount; i++ { + go wait.Until(ctrl.cnrMonitorWorker, time.Second, ctrl.ctx.Done()) + } + + // gc podTimeMap + klog.Infof("start gc podTimeMap...") + go wait.Until(ctrl.gcPodTimeMap, maxToleranceLantency, ctrl.ctx.Done()) + + <-ctrl.ctx.Done() +} + +func (ctrl *CNRMonitorController) cnrMonitorWorker() { + for ctrl.processNextCNR() { + } +} + +// processNextCNR dequeues items, processes them, and marks them done. +// It enforces that the sync is never invoked concurrently with the same key. +func (ctrl *CNRMonitorController) processNextCNR() bool { + key, quit := ctrl.cnrSyncQueue.Get() + if quit { + return false + } + defer ctrl.cnrSyncQueue.Done(key) + + err := ctrl.syncCNR(key.(string)) + if err == nil { + ctrl.cnrSyncQueue.Forget(key) + return true + } + + // if err is not nil, requeue the key + utilruntime.HandleError(fmt.Errorf("sync %s failed with %v", key, err)) + ctrl.cnrSyncQueue.AddRateLimited(key) + + return true +} + +func (ctrl *CNRMonitorController) syncCNR(key string) error { + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + cnr, err := ctrl.cnrLister.Get(name) + if errors.IsNotFound(err) { + // cnr is deleted, so we can skip + klog.Info("CNR has been deleted %v", key) + return nil + } + if err != nil { + return err + } + + // hasAnomaly is used to record whether cnr has anomaly + hasAnomaly := false + // check numa exclusive anomaly + klog.Infof("Check Numa Exclusive Anomaly...") + if ctrl.checkNumaExclusiveAnomaly(cnr) { + hasAnomaly = true + klog.Infof("Emit Numa Exclusive Anomaly metric...") + err = ctrl.emitCNRAnomalyMetric(cnr, reasonNumaExclusiveAnomaly) + if err != nil { + return err + } + } + // check numa allocatable sum anomaly + klog.Infof("Check Numa Allocatable Sum Anomaly...") + if ctrl.checkNumaAllocatableSumAnomaly(cnr) { + hasAnomaly = true + klog.Infof("Emit Numa Allocatable Sum Anomaly metric...") + err = ctrl.emitCNRAnomalyMetric(cnr, reasonNumaAllocatableSumAnomaly) + if err != nil { + return err + } + } + + // check pod allocation sum anomaly + klog.Infof("Check Pod Allocation Sum Anomaly...") + if ctrl.checkPodAllocationSumAnomaly(cnr) { + hasAnomaly = true + klog.Infof("Emit Pod Allocation Sum Anomaly metric...") + err = ctrl.emitCNRAnomalyMetric(cnr, reasonPodAllocationSumAnomaly) + if err != nil { + return err + } + } + + // if hasAnomaly is true, re-enqueue cnr use AddAfter func with 30s duration + if hasAnomaly { + klog.Infof("CNR %s has anomaly, re-enqueue it after 30s", cnr.Name) + ctrl.cnrSyncQueue.AddAfter(key, 30*time.Second) + } + + return nil +} + +// enqueueCNR enqueues the given CNR in the work queue. +func (ctrl *CNRMonitorController) enqueueCNR(cnr *apis.CustomNodeResource) { + if cnr == nil { + klog.Warning("trying to enqueue a nil cnr") + return + } + + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(cnr) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Cound't get key for CNR %+v: %v", cnr, err)) + return + } + ctrl.cnrSyncQueue.Add(key) +} + +func (ctrl *CNRMonitorController) addCNREventHandler(obj interface{}) { + klog.Infof("CNR create event found...") + cnr, ok := obj.(*apis.CustomNodeResource) + if !ok { + klog.Errorf("cannot convert obj to *apis.CNR") + return + } + klog.V(4).Infof("notice addition of cnr %s", cnr.Name) + + ctrl.enqueueCNR(cnr) +} + +func (ctrl *CNRMonitorController) updateCNREventHandler(_, newObj interface{}) { + klog.Infof("CNR update event found...") + cnr, ok := newObj.(*apis.CustomNodeResource) + if !ok { + klog.Errorf("cannot convert newObj to *apis.CNR") + return + } + klog.V(4).Infof("notice update of cnr %s", cnr.Name) + + // check and emit cnr pod report lantency metric + klog.Infof("Check and Emit CNR Report Lantency metric...") + err := ctrl.checkAndEmitCNRReportLantencyMetric(cnr) + if err != nil { + klog.Errorf("check and emit cnr report lantency metric failed: %v", err) + } + + ctrl.enqueueCNR(cnr) +} + +func (ctrl *CNRMonitorController) updatePodEventHandler(oldObj, newObj interface{}) { + klog.Infof("Pod update event found...") + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + klog.Errorf("cannot convert oldObj to Pod") + return + } + newPod, ok := newObj.(*corev1.Pod) + if !ok { + klog.Errorf("cannot convert newObj to Pod") + return + } + if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" { + klog.Infof("notice pod: %v scheduled to node: %v", newPod.Name, newPod.Spec.NodeName) + // record pod scheduled time + ctrl.podTimeMap.Store(native.GenerateUniqObjectUIDKey(newPod), time.Now()) + } +} + +// gcPodTimeMap gc podTimeMap which over maxToleranceLantency not used +func (ctrl *CNRMonitorController) gcPodTimeMap() { + klog.Infof("gc podTimeMap...") + ctrl.podTimeMap.Range(func(key, value interface{}) bool { + notUsedTime := time.Now().Sub(value.(time.Time)) + if notUsedTime > maxToleranceLantency { + klog.Infof("gc podTimeMap: %v, which not used over %v minutes", key, notUsedTime.Minutes()) + ctrl.podTimeMap.Delete(key) + namespace, podName, _, err := native.ParseUniqObjectUIDKey(key.(string)) + if err != nil { + klog.Errorf("failed to parse uniq object uid key %s", key) + return true + } + pod, err := ctrl.podLister.Pods(namespace).Get(podName) + if err != nil { + klog.Errorf("failed to get pod %s/%s", namespace, podName) + return true + } + if !native.PodIsTerminated(pod) { + klog.Infof("Emit Timeout CNR Report Lantency metric...") + // emit timeout cnr report lantency metric + ctrl.emitCNRReportLantencyMetric(pod.Spec.NodeName, key.(string), maxToleranceLantency.Milliseconds(), "true") + if err != nil { + klog.Errorf("emit cnr report lantency metric failed: %v", err) + } + } + } + return true // return true to continue iterating + }) +} diff --git a/pkg/controller/monitor/cnr_indicator.go b/pkg/controller/monitor/cnr_indicator.go new file mode 100644 index 000000000..79dec1179 --- /dev/null +++ b/pkg/controller/monitor/cnr_indicator.go @@ -0,0 +1,220 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/native" + "github.com/kubewharf/katalyst-core/pkg/util/qos" +) + +const ( + metricsNameCNRReportAnomaly = "cnr_report_anomaly" + metricsNameCNRReportLantency = "cnr_report_latency" +) + +const ( + // reasonNumaExclusiveAnomaly is the reason for numa exclusive anomaly + // when numa_binding and numa_exclusive are both set + // the pod with numa_binding and numa_exclusive shares the numa with other pods + reasonNumaExclusiveAnomaly = "NumaExclusiveAnomaly" + // reasonNumaAllocatableSumAnomaly is the reason for numa allocatable sum anomaly + // when the node's sum of numa allocatable is not equal to the node allocatable + reasonNumaAllocatableSumAnomaly = "AllocatableSumAnomaly" + // reasonPodAllocationSumAnomaly is the reason for pod allocation sum anomaly + // when the numa's sum of pod allocation is greater than the numa allocatable + reasonPodAllocationSumAnomaly = "PodAllocationSumAnomaly" +) + +// checkNumaExclusiveAnomaly checks whether exist the pod with numa_binding and numa_exclusive shares the numa with other pods +func (ctrl *CNRMonitorController) checkNumaExclusiveAnomaly(cnr *v1alpha1.CustomNodeResource) bool { + qosConf := generic.NewQoSConfiguration() + for _, socket := range cnr.Status.TopologyZone { + for _, numa := range socket.Children { + if numa.Type != v1alpha1.TopologyTypeNuma { + // only check numa + continue + } + numabinding_pods := []*v1.Pod{} + // filter the pod with numa_binding + for _, allocation := range numa.Allocations { + key := allocation.Consumer + namespace, podname, _, err := native.ParseUniqObjectUIDKey(key) + if err != nil { + klog.Errorf("[CNRIndicatorNumaExclusiveAnomaly] failed to parse uniq object uid key %s", key) + continue + } + pod, err := ctrl.podLister.Pods(namespace).Get(podname) + if err != nil { + klog.Errorf("[CNRIndicatorNumaExclusiveAnomaly] failed to get pod %s", key) + continue + } + if qos.IsPodNumaBinding(qosConf, pod) { + numabinding_pods = append(numabinding_pods, pod) + } + } + // if the pod with numa_binding and numa_exclusive shares the numa with other pods, return true + for _, pod := range numabinding_pods { + if qos.IsPodNumaExclusive(qosConf, pod) && len(numabinding_pods) > 1 { + return true + } + } + } + } + return false +} + +// checkNumaAllocatableSumAnomaly checks whether the node's sum of numa allocatable is not equal to the node allocatable +func (ctrl *CNRMonitorController) checkNumaAllocatableSumAnomaly(cnr *v1alpha1.CustomNodeResource) bool { + node, err := ctrl.nodeLister.Get(cnr.Name) + if err != nil { + klog.Errorf("[CNRIndicatorNumaAllocatableSumAnomaly] failed to get node %s", cnr.Name) + return false + } + + nodeCpuAllocatable, nodeMemCapacity := int(node.Status.Allocatable.Cpu().AsApproximateFloat64()), int(node.Status.Capacity.Memory().AsApproximateFloat64()) + klog.Infof("[CNRIndicatorNumaAllocatableSumAnomaly] nodeCpuAllocatable: %d, nodeMemCapacity: %d", nodeCpuAllocatable, nodeMemCapacity) + numaCpuAllocatableSum, numaMemAllocatableSum := 0, 0 + for _, socket := range cnr.Status.TopologyZone { + for _, numa := range socket.Children { + if numa.Type != v1alpha1.TopologyTypeNuma { + // only check numa + continue + } + numaCpuAllocatableSum += int(numa.Resources.Allocatable.Cpu().AsApproximateFloat64()) + numaMemAllocatableSum += int(numa.Resources.Allocatable.Memory().AsApproximateFloat64()) + } + } + klog.Infof("[CNRIndicatorNumaAllocatableSumAnomaly] numaCpuAllocatableSum: %d, numaMemAllocatableSum: %d", numaCpuAllocatableSum, numaMemAllocatableSum) + // TODO: thie rule maybe need to adapt to the scheduler in the future + if numaCpuAllocatableSum != nodeCpuAllocatable || numaMemAllocatableSum > nodeMemCapacity { + return true + } + return false +} + +// checkPodAllocationSumAnomaly checks whether the numa's sum of pod allocation is greater than the numa allocatable +func (ctrl *CNRMonitorController) checkPodAllocationSumAnomaly(cnr *v1alpha1.CustomNodeResource) bool { + qosConf := generic.NewQoSConfiguration() + for _, socket := range cnr.Status.TopologyZone { + for _, numa := range socket.Children { + if numa.Type != v1alpha1.TopologyTypeNuma { + // only check numa + continue + } + numaCpuAllocatable, numaMemAllocatable := int(numa.Resources.Allocatable.Cpu().AsApproximateFloat64()), int(numa.Resources.Allocatable.Memory().AsApproximateFloat64()) + klog.Infof("[CNRIndicatorPodAllocationSumAnomaly] numaCpuAllocatable: %d, numaMemAllocatable: %d", numaCpuAllocatable, numaMemAllocatable) + podCpuAllocationSum, podMemAllocationSum := 0, 0 + for _, allocation := range numa.Allocations { + key := allocation.Consumer + namespace, podname, _, err := native.ParseUniqObjectUIDKey(key) + if err != nil { + klog.Errorf("[CNRIndicatorPodAllocationSumAnomaly] failed to parse uniq object uid key %s", key) + continue + } + pod, err := ctrl.podLister.Pods(namespace).Get(podname) + if err != nil { + klog.Errorf("[CNRIndicatorPodAllocationSumAnomaly] failed to get pod %s", key) + continue + } + // only check the pod with numa binding for now + if qos.IsPodNumaBinding(qosConf, pod) { + podCpuAllocationSum += int(allocation.Requests.Cpu().AsApproximateFloat64()) + podMemAllocationSum += int(allocation.Requests.Memory().AsApproximateFloat64()) + } + } + klog.Infof("[CNRIndicatorPodAllocationSumAnomaly] podCpuAllocationSum: %d, podMemAllocationSum: %d", podCpuAllocationSum, podMemAllocationSum) + if podCpuAllocationSum > numaCpuAllocatable || podMemAllocationSum > numaMemAllocatable { + return true + } + } + } + return false +} + +// emitCNRAnomalyMetric emit CNR anomaly metric +func (ctrl *CNRMonitorController) emitCNRAnomalyMetric(cnr *v1alpha1.CustomNodeResource, reason string) error { + _ = ctrl.metricsEmitter.StoreInt64(metricsNameCNRReportAnomaly, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{ + Key: "node_name", Val: cnr.Name, + }, + metrics.MetricTag{ + Key: "reason", Val: reason, + }, + ) + + return nil +} + +// checkAndEmitCNRReportLantencyMetric check and emit CNR report lantency metric +func (ctrl *CNRMonitorController) checkAndEmitCNRReportLantencyMetric(cnr *v1alpha1.CustomNodeResource) error { + for _, socket := range cnr.Status.TopologyZone { + for _, numa := range socket.Children { + if numa.Type != v1alpha1.TopologyTypeNuma { + // only check numa + continue + } + for _, allocation := range numa.Allocations { + key := allocation.Consumer + scheduledTime, ok := ctrl.podTimeMap.Load(key) + // if the pod is not in podTimeMap or if the podTimeMap value is zero, continue + if !ok || scheduledTime.(time.Time).IsZero() { + continue + } + // emit cnr report lantency metric + ctrl.emitCNRReportLantencyMetric(cnr.Name, key, time.Since(scheduledTime.(time.Time)).Milliseconds(), "false") + // delete the used data from podTimeMap + ctrl.podTimeMap.Delete(key) + } + } + } + return nil +} + +// emitCNRReportLantencyMetric emit CNR report lantency metric +func (ctrl *CNRMonitorController) emitCNRReportLantencyMetric(nodeName string, key string, lantency int64, isTimeOut string) { + namespace, podName, uid, err := native.ParseUniqObjectUIDKey(key) + if err != nil { + klog.Errorf("[CNRReportLantency] failed to parse uniq object uid key %s", key) + } + klog.Infof("[CNRReportLantency] pod %s/%s/%s report lantency: %dms", namespace, podName, uid, lantency) + _ = ctrl.metricsEmitter.StoreFloat64(metricsNameCNRReportLantency, float64(lantency), + metrics.MetricTypeNameRaw, + metrics.MetricTag{ + Key: "node_name", Val: nodeName, + }, + metrics.MetricTag{ + Key: "namespace", Val: namespace, + }, + metrics.MetricTag{ + Key: "pod_name", Val: podName, + }, + metrics.MetricTag{ + Key: "pod_uid", Val: uid, + }, + metrics.MetricTag{ + Key: "time_out", Val: isTimeOut, + }, + ) +} diff --git a/pkg/controller/monitor/cnr_indicator_test.go b/pkg/controller/monitor/cnr_indicator_test.go new file mode 100644 index 000000000..61858827b --- /dev/null +++ b/pkg/controller/monitor/cnr_indicator_test.go @@ -0,0 +1,783 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" +) + +func Test_checkNumaExclusiveAnomaly(t *testing.T) { + t.Parallel() + + type fields struct { + pods []*v1.Pod + cnr *v1alpha1.CustomNodeResource + } + + tests := []struct { + name string + fields fields + wantResult bool + }{ + { + name: "numa exclusive anomaly with numa binding and numa exclusive pod", + fields: fields{ + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod1", + Namespace: "test-namespace", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{ + "numa_binding": "true", + "numa_exclusive": "true" + }`, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod2", + Namespace: "test-namespace", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{ + "numa_binding": "true", + "numa_exclusive": "true" + }`, + }, + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/13414141", + }, + { + Consumer: "test-namespace/test-pod2/31413414", + }, + }, + Name: "0", + }, + }, + }, + }, + }, + }, + }, + wantResult: true, + }, + { + name: "numa exclusive anomaly test with non numa binding and non numa exclusive pod", + fields: fields{ + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod1", + Namespace: "test-namespace", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod2", + Namespace: "test-namespace", + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/13414141", + }, + { + Consumer: "test-namespace/test-pod2/31413414", + }, + }, + Name: "0", + }, + }, + }, + }, + }, + }, + }, + wantResult: false, + }, + { + name: "numa exclusive anomaly with non numa binding and numa exclusive pod", + fields: fields{ + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod1", + Namespace: "test-namespace", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{ + "numa_binding": "false", + "numa_exclusive": "true" + }`, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod2", + Namespace: "test-namespace", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{ + "numa_binding": "true", + "numa_exclusive": "false" + }`, + }, + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/13414141", + }, + { + Consumer: "test-namespace/test-pod2/31413414", + }, + }, + Name: "0", + }, + }, + }, + }, + }, + }, + }, + wantResult: false, + }, + { + name: "numa exclusive anomaly with numa binding and non numa exclusive pod", + fields: fields{ + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod1", + Namespace: "test-namespace", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{ + "numa_binding": "true", + "numa_exclusive": "false" + }`, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod2", + Namespace: "test-namespace", + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/13414141", + }, + { + Consumer: "test-namespace/test-pod2/31413414", + }, + }, + Name: "0", + }, + }, + }, + }, + }, + }, + }, + wantResult: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{tt.fields.pods[0], tt.fields.pods[1]}, []runtime.Object{tt.fields.cnr}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRMonitorController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.CNRMonitorConfig, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Nodes(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + assert.NoError(t, err) + + // test cache synced + genericCtx.KubeInformerFactory.Start(ctrl.ctx.Done()) + genericCtx.InternalInformerFactory.Start(ctrl.ctx.Done()) + + cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) + time.Sleep(100 * time.Millisecond) + + result := ctrl.checkNumaExclusiveAnomaly(tt.fields.cnr) + assert.Equal(t, tt.wantResult, result) + }) + } +} + +func Test_checkNumaAllocatableSumAnomaly(t *testing.T) { + t.Parallel() + + type fields struct { + node *v1.Node + cnr *v1alpha1.CustomNodeResource + } + + tests := []struct { + name string + fields fields + wantResult bool + }{ + { + name: "numa allocatable sum anomaly not found", + fields: fields{ + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(6), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(2000), resource.BinarySI), + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(6), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(2000), resource.BinarySI), + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "1", + }, + }, + }, + }, + }, + }, + }, + wantResult: false, + }, + { + name: "numa allocatable sum anomaly found", + fields: fields{ + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(5), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(5), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "0", + }, + }, + }, + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNuma, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "0", + }, + }, + }, + }, + }, + }, + }, + wantResult: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{tt.fields.node}, []runtime.Object{tt.fields.cnr}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRMonitorController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.CNRMonitorConfig, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Nodes(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + assert.NoError(t, err) + + // test cache synced + genericCtx.KubeInformerFactory.Start(ctrl.ctx.Done()) + genericCtx.InternalInformerFactory.Start(ctrl.ctx.Done()) + + cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) + time.Sleep(100 * time.Millisecond) + + result := ctrl.checkNumaAllocatableSumAnomaly(tt.fields.cnr) + assert.Equal(t, tt.wantResult, result) + }) + } +} + +func Test_checkPodAllocationSumAnomaly(t *testing.T) { + t.Parallel() + + type fields struct { + pods []*v1.Pod + cnr *v1alpha1.CustomNodeResource + } + + tests := []struct { + name string + fields fields + wantResult bool + }{ + { + name: "pod allocatable sum anomaly not found", + fields: fields{ + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod1", + Namespace: "test-namespace", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{ + "numa_binding": "true", + "numa_exclusive": "false" + }`, + }, + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/13414141", + Requests: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + }, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "0", + }, + }, + }, + }, + }, + }, + }, + wantResult: false, + }, + { + name: "pod allocatable sum anomaly found", + fields: fields{ + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod1", + Namespace: "test-namespace", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: `{ + "numa_binding": "true", + "numa_exclusive": "false" + }`, + }, + }, + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/13414141", + Requests: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(4), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1001), resource.BinarySI), + }, + }, + }, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "0", + }, + }, + }, + }, + }, + }, + }, + wantResult: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{tt.fields.pods[0]}, []runtime.Object{tt.fields.cnr}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRMonitorController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.CNRMonitorConfig, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Nodes(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + assert.NoError(t, err) + + // test cache synced + genericCtx.KubeInformerFactory.Start(ctrl.ctx.Done()) + genericCtx.InternalInformerFactory.Start(ctrl.ctx.Done()) + + cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) + time.Sleep(100 * time.Millisecond) + + result := ctrl.checkPodAllocationSumAnomaly(tt.fields.cnr) + assert.Equal(t, tt.wantResult, result) + }) + } +} + +func Test_checkAndEmitCNRReportLantencyMetric(t *testing.T) { + t.Parallel() + + cnr := &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: v1alpha1.CustomNodeResourceStatus{ + TopologyZone: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeSocket, + Children: []*v1alpha1.TopologyZone{ + { + Type: v1alpha1.TopologyTypeNIC, + Attributes: []v1alpha1.Attribute{ + { + Name: "katalyst.kubewharf.io/resource_identifier", + Value: "enp0s3", + }, + }, + Name: "eth0", + }, + { + Type: v1alpha1.TopologyTypeNuma, + Allocations: []*v1alpha1.Allocation{ + { + Consumer: "test-namespace/test-pod1/1111111111", + Requests: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + }, + Resources: v1alpha1.Resources{ + Allocatable: &v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI), + }, + }, + Name: "0", + }, + }, + }, + }, + }, + } + + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}, []runtime.Object{cnr}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRMonitorController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.CNRMonitorConfig, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Nodes(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + + ctrl.podTimeMap.Store("test-namespace/test-pod1/1111111111", time.Now()) + time.Sleep(2 * time.Millisecond) + assert.NoError(t, err) + + err = ctrl.checkAndEmitCNRReportLantencyMetric(cnr) + assert.NoError(t, err) + if _, ok := ctrl.podTimeMap.Load("test-namespace/test-pod1/1111111111"); ok { + t.Errorf("podTimeMap should not have key test-namespace/test-pod1/1111111111") + } +} diff --git a/pkg/controller/monitor/cnr_test.go b/pkg/controller/monitor/cnr_test.go new file mode 100644 index 000000000..4b89593ee --- /dev/null +++ b/pkg/controller/monitor/cnr_test.go @@ -0,0 +1,247 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" +) + +func TestCNRMonitor_Run(t *testing.T) { + t.Parallel() + + var ( + oldPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "uid1", + }, + Spec: corev1.PodSpec{ + NodeName: "", + }, + } + ) + + type fields struct { + pod *corev1.Pod + cnr *v1alpha1.CustomNodeResource + } + tests := []struct { + name string + fields fields + }{ + { + name: "test-pod-nodename-updated", + fields: fields{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "uid1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + }, + cnr: &v1alpha1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{ + "test": "test", + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{oldPod}, []runtime.Object{tt.fields.cnr}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRMonitorController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.CNRMonitorConfig, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Nodes(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + assert.NoError(t, err) + + // test cache synced + genericCtx.KubeInformerFactory.Start(ctrl.ctx.Done()) + genericCtx.InternalInformerFactory.Start(ctrl.ctx.Done()) + go ctrl.Run() + + cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) + time.Sleep(100 * time.Millisecond) + + gotCNR, err := ctrl.cnrLister.Get(tt.fields.cnr.Name) + assert.NoError(t, err) + assert.Equal(t, tt.fields.cnr, gotCNR) + + gotPod, err := ctrl.podLister.Pods(oldPod.Namespace).Get(oldPod.Name) + assert.NoError(t, err) + assert.Equal(t, oldPod, gotPod) + + // test schedule pod to node + _, err = genericCtx.Client.KubeClient.CoreV1().Pods(tt.fields.pod.Namespace).Update(ctrl.ctx, tt.fields.pod, metav1.UpdateOptions{}) + assert.NoError(t, err) + time.Sleep(100 * time.Millisecond) + }) + } +} + +func Test_gcPodTimeMap(t *testing.T) { + t.Parallel() + + var ( + time1 = time.Now().Add(-6 * time.Minute) + time2 = time.Now().Add(-time.Second) + ) + type args struct { + pod *corev1.Pod + podTimeMap map[string]time.Time + } + tests := []struct { + name string + args args + want map[string]time.Time + }{ + { + name: "test-gc and emit timeout cnr report lantency metric", + args: args{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "test", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "container-1", + Image: "nginx:latest", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Ready: true, + RestartCount: 0, + }, + }, + }, + }, + podTimeMap: map[string]time.Time{ + "test/pod1/1": time1, + "test/pod2/2": time2, + }, + }, + want: map[string]time.Time{ + "test/pod2/2": time2, + }, + }, + { + name: "test-no-gc", + args: args{ + podTimeMap: map[string]time.Time{ + "test/pod1/1": time2, + }, + }, + want: map[string]time.Time{ + "test/pod1/1": time2, + }, + }, + { + name: "test-empty", + args: args{ + podTimeMap: map[string]time.Time{}, + }, + want: map[string]time.Time{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{tt.args.pod}, []runtime.Object{}) + assert.NoError(t, err) + + conf, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, conf) + + ctrl, err := NewCNRMonitorController( + context.Background(), + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.CNRMonitorConfig, + genericCtx.Client, + genericCtx.KubeInformerFactory.Core().V1().Nodes(), + genericCtx.KubeInformerFactory.Core().V1().Pods(), + genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(), + metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}), + ) + assert.NoError(t, err) + + // test cache synced + genericCtx.KubeInformerFactory.Start(ctrl.ctx.Done()) + genericCtx.InternalInformerFactory.Start(ctrl.ctx.Done()) + + cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) + time.Sleep(100 * time.Millisecond) + + for k, v := range tt.args.podTimeMap { + ctrl.podTimeMap.Store(k, v) + } + + ctrl.gcPodTimeMap() + + podTimeMap := map[string]time.Time{} + ctrl.podTimeMap.Range(func(k, v interface{}) bool { + podTimeMap[k.(string)] = v.(time.Time) + return true + }) + + assert.Equal(t, tt.want, podTimeMap) + }) + } +} diff --git a/pkg/util/native/object.go b/pkg/util/native/object.go index d47f92bee..0c69111cc 100644 --- a/pkg/util/native/object.go +++ b/pkg/util/native/object.go @@ -39,6 +39,16 @@ func GenerateUniqObjectUIDKey(obj metav1.Object) string { return fmt.Sprintf("%s/%s/%s", obj.GetNamespace(), obj.GetName(), obj.GetUID()) } +// ParseUniqObjectUIDKey parse the given key into namespace, name and uid +func ParseUniqObjectUIDKey(key string) (namespace string, name string, uid string, err error) { + names := strings.Split(key, "/") + if len(names) != 3 { + return "", "", "", fmt.Errorf("workload key %s split error", key) + } + + return names[0], names[1], names[2], nil +} + // GenerateUniqObjectNameKey generate a uniq key (without UID) for the given object. func GenerateUniqObjectNameKey(obj metav1.Object) string { return GenerateNamespaceNameKey(obj.GetNamespace(), obj.GetName())