Skip to content

Commit

Permalink
feat: monitoring lantency of reported information in KCNR
Browse files Browse the repository at this point in the history
Signed-off-by: zhy76 <958474674@qq.com>
  • Loading branch information
zhy76 committed Sep 13, 2023
1 parent 2e11296 commit 87193a3
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/katalyst-controller/app/controller/cnr.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func StartCNRController(ctx context.Context, controlCtx *katalystbase.GenericCon
conf.GenericConfiguration,
conf.GenericControllerConfiguration,
controlCtx.Client,
// controlCtx.KubeInformerFactory.Core().V1().Pods(),
controlCtx.KubeInformerFactory.Core().V1().Pods(),
controlCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(),
controlCtx.EmitterPool.GetDefaultMetricsEmitter(),
)
Expand Down
98 changes: 70 additions & 28 deletions pkg/controller/cnr/cnr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/cri-api/pkg/errors"
Expand Down Expand Up @@ -51,16 +54,17 @@ type CNRController struct {

cnrListerSynced cache.InformerSynced
cnrLister listers.CustomNodeResourceLister
// podListerSynced cache.InformerSynced
// podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podLister corelisters.PodLister

// queue for cnr
cnrSyncQueue workqueue.RateLimitingInterface
// // queue for pod
// podSyncQueue workqueue.RateLimitingInterface

// metricsEmitter for emit metrics
metricsEmitter metrics.MetricEmitter

// podTimeMap for record pod scheduled time
podTimeMap map[string]time.Time
}

// NewCNRController create a new CNRController
Expand All @@ -69,15 +73,15 @@ func NewCNRController(
genericConf *generic.GenericConfiguration,
_ *controller.GenericControllerConfiguration,
client *client.GenericClientSet,
// podInformer coreinformers.PodInformer,
podInformer coreinformers.PodInformer,
cnrInformer informers.CustomNodeResourceInformer,
metricsEmitter metrics.MetricEmitter) (*CNRController, error) {

cnrController := &CNRController{
ctx: ctx,
client: client,
cnrSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cnrControllerName),
// podSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod"),
podTimeMap: make(map[string]time.Time),
}

// init cnr informer
Expand All @@ -91,14 +95,13 @@ func NewCNRController(
cnrController.cnrListerSynced = cnrInformer.Informer().HasSynced

// init pod informer
// podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc: cnrController.addPodEventHandler,
// UpdateFunc: cnrController.updatePodEventHandler,
// })
// // init pod lister
// cnrController.podLister = podInformer.Lister()
// // init pod synced
// cnrController.podListerSynced = podInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: cnrController.updatePodEventHandler,
})
// init pod lister
cnrController.podLister = podInformer.Lister()
// init pod synced
cnrController.podListerSynced = podInformer.Informer().HasSynced

if metricsEmitter == nil {
// if metricsEmitter is nil, use dummy metrics
Expand All @@ -121,21 +124,25 @@ func NewCNRController(
func (ctrl *CNRController) Run() {
defer utilruntime.HandleCrash()
defer ctrl.cnrSyncQueue.ShutDown()
// defer ctrl.podSyncQueue.ShutDown()
defer klog.Infof("Shutting down %s controller", cnrControllerName)

// wait for cache sync
if !cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced) {
// wait for cnr cache sync
if !cache.WaitForCacheSync(ctrl.ctx.Done(), ctrl.cnrListerSynced, ctrl.podListerSynced) {
utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", cnrControllerName))
return
}

klog.Infof("Caches are synced for %s controller", cnrControllerName)
klog.Infof("start %d workers for %s controller", cnrWorkerCount, cnrControllerName)

for i := 0; i < cnrWorkerCount; i++ {
go wait.Until(ctrl.cnrWorker, time.Second, ctrl.ctx.Done())
}

// gc podTimeMap
klog.Infof("start gc podTimeMap...")
go wait.Until(ctrl.gcPodTimeMap, 1*time.Minute, ctrl.ctx.Done())

<-ctrl.ctx.Done()
}

Expand Down Expand Up @@ -216,6 +223,11 @@ func (ctrl *CNRController) syncCNR(key string) error {

// enqueueCNR enqueues the given CNR in the work queue.
func (ctrl *CNRController) enqueueCNR(cnr *apis.CustomNodeResource) {
if cnr == nil {
klog.Warning("trying to enqueue a nil cnr")
return
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(cnr)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Cound't get key for CNR %+v: %v", cnr, err))
Expand All @@ -228,7 +240,7 @@ func (ctrl *CNRController) addCNREventHandler(obj interface{}) {
klog.Infof("CNR create event found...")
cnr, ok := obj.(*apis.CustomNodeResource)
if !ok {
klog.Errorf("cnanot convert obj to *apis.CNR: %v", obj)
klog.Errorf("cannot convert obj to *apis.CNR")
return
}
klog.V(4).Infof("notice addition of cnr %s", cnr.Name)
Expand All @@ -238,20 +250,50 @@ func (ctrl *CNRController) addCNREventHandler(obj interface{}) {

func (ctrl *CNRController) updateCNREventHandler(_, newObj interface{}) {
klog.Infof("CNR update event found...")
c, ok := newObj.(*apis.CustomNodeResource)
cnr, ok := newObj.(*apis.CustomNodeResource)
if !ok {
klog.Errorf("cannot convert newObj to *apis.CNR: %v", c)
klog.Errorf("cannot convert newObj to *apis.CNR")
return
}
klog.V(4).Infof("notice addition of cnr %s", c.Name)
klog.V(4).Infof("notice update of cnr %s", cnr.Name)

ctrl.enqueueCNR(c)
// emit cnr pod report lantency metric
klog.Infof("Emit CNR Report Lantency metric...")
err := ctrl.emitCNRReportLantencyMetric(cnr)
if err != nil {
klog.Errorf("emit cnr report lantency metric failed: %v", err)
}

ctrl.enqueueCNR(cnr)
}

// func (ctrl *CNRController) addPodEventHandler(obj interface{}) {
// ctrl.podSyncQueue.Add(obj)
// }
func (ctrl *CNRController) updatePodEventHandler(oldObj, newObj interface{}) {
klog.Infof("Pod update event found...")
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
klog.Errorf("cannot convert oldObj to Pod")
return
}
newPod, ok := newObj.(*corev1.Pod)
if !ok {
klog.Errorf("cannot convert newObj to Pod")
return
}
if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" {
klog.Infof("notice pod: %v scheduled to node: %v", newPod.Name, newPod.Spec.NodeName)
// record pod scheduled time
ctrl.podTimeMap[string(newPod.UID)] = time.Now()
}
}

// func (ctrl *CNRController) updatePodEventHandler(oldObj, newObj interface{}) {
// ctrl.podSyncQueue.Add(newObj)
// }
// gcPodTimeMap gc podTimeMap which over 1 minutes not used
func (ctrl *CNRController) gcPodTimeMap() {
klog.Infof("gc podTimeMap...")
for key, value := range ctrl.podTimeMap {
notUsedTime := time.Now().Sub(value)
if notUsedTime > time.Minute {
klog.Infof("gc podTimeMap: %v, which not used over %v minutes", key, notUsedTime.Minutes())
delete(ctrl.podTimeMap, key)
}
}
}
44 changes: 42 additions & 2 deletions pkg/controller/cnr/cnr_indicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package cnr

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

Expand All @@ -28,8 +30,8 @@ import (
)

const (
metricsNameCNRReportAnomaly = "cnr_report_anomaly"
// metricsNameCNRReportLantency = "cnr_report_latency"
metricsNameCNRReportAnomaly = "cnr_report_anomaly"
metricsNameCNRReportLantency = "cnr_report_latency"
)

const (
Expand Down Expand Up @@ -148,3 +150,41 @@ func (ctrl *CNRController) emitCNRAnomalyMetric(cnr *v1alpha1.CustomNodeResource

return nil
}

// emitCNRReportLantencyMetric emit CNR report lantency metric
func (ctrl *CNRController) emitCNRReportLantencyMetric(cnr *v1alpha1.CustomNodeResource) error {
for _, socket := range cnr.Status.TopologyZone {
for _, numa := range socket.Children {
if numa.Type != v1alpha1.TopologyTypeNuma {
// only check numa
continue
}
for _, allocation := range numa.Allocations {
key := allocation.Consumer
namespace, podName, uid, err := native.ParseUniqObjectUIDKey(key)
if err != nil {
klog.Errorf("[CNRReportLantency] failed to parse uniq object uid key %s", key)
}
if _, ok := ctrl.podTimeMap[uid]; !ok {
continue
}
klog.Infof("[CNRReportLantency] pod %s/%s/%s report lantency: %dms", namespace, podName, uid, time.Since(ctrl.podTimeMap[uid]).Milliseconds())
_ = ctrl.metricsEmitter.StoreFloat64(metricsNameCNRReportLantency, float64(time.Since(ctrl.podTimeMap[uid]).Milliseconds()),
metrics.MetricTypeNameRaw,
metrics.MetricTag{
Key: "namespace", Val: namespace,
},
metrics.MetricTag{
Key: "podName", Val: podName,
},
metrics.MetricTag{
Key: "podUid", Val: uid,
},
)
// delete the used data from podTimeMap
delete(ctrl.podTimeMap, uid)
}
}
}
return nil
}
83 changes: 80 additions & 3 deletions pkg/controller/cnr/cnr_indicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cnr
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -307,7 +308,7 @@ func Test_checkNumaExclusiveAnomaly(t *testing.T) {
conf.GenericConfiguration,
conf.GenericControllerConfiguration,
genericCtx.Client,
// genericCtx.KubeInformerFactory.Core().V1().Pods(),
genericCtx.KubeInformerFactory.Core().V1().Pods(),
genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(),
metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}),
)
Expand Down Expand Up @@ -483,7 +484,7 @@ func Test_checkNumaAllocatableSumAnomaly(t *testing.T) {
conf.GenericConfiguration,
conf.GenericControllerConfiguration,
genericCtx.Client,
// genericCtx.KubeInformerFactory.Core().V1().Pods(),
genericCtx.KubeInformerFactory.Core().V1().Pods(),
genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(),
metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}),
)
Expand Down Expand Up @@ -620,7 +621,7 @@ func Test_checkPodAllocationSumAnomaly(t *testing.T) {
conf.GenericConfiguration,
conf.GenericControllerConfiguration,
genericCtx.Client,
// genericCtx.KubeInformerFactory.Core().V1().Pods(),
genericCtx.KubeInformerFactory.Core().V1().Pods(),
genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(),
metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}),
)
Expand All @@ -631,3 +632,79 @@ func Test_checkPodAllocationSumAnomaly(t *testing.T) {
})
}
}

func Test_emitCNRReportLantencyMetric(t *testing.T) {
t.Parallel()

cnr := &v1alpha1.CustomNodeResource{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Status: v1alpha1.CustomNodeResourceStatus{
TopologyZone: []*v1alpha1.TopologyZone{
{
Type: v1alpha1.TopologyTypeSocket,
Children: []*v1alpha1.TopologyZone{
{
Type: v1alpha1.TopologyTypeNIC,
Attributes: []v1alpha1.Attribute{
{
Name: "katalyst.kubewharf.io/resource_identifier",
Value: "enp0s3",
},
},
Name: "eth0",
},
{
Type: v1alpha1.TopologyTypeNuma,
Allocations: []*v1alpha1.Allocation{
{
Consumer: "test-namespace/test-pod1/1111111111",
Requests: &v1.ResourceList{
v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI),
},
},
},
Resources: v1alpha1.Resources{
Allocatable: &v1.ResourceList{
v1.ResourceCPU: *resource.NewQuantity(int64(3), resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(int64(1000), resource.BinarySI),
},
},
Name: "0",
},
},
},
},
},
}

genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}, []runtime.Object{cnr})
assert.NoError(t, err)

conf, err := options.NewOptions().Config()
require.NoError(t, err)
require.NotNil(t, conf)

ctrl, err := NewCNRController(
context.Background(),
conf.GenericConfiguration,
conf.GenericControllerConfiguration,
genericCtx.Client,
genericCtx.KubeInformerFactory.Core().V1().Pods(),
genericCtx.InternalInformerFactory.Node().V1alpha1().CustomNodeResources(),
metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{}),
)
ctrl.podTimeMap = map[string]time.Time{
"1111111111": time.Now(),
}
time.Sleep(2 * time.Millisecond)
assert.NoError(t, err)

err = ctrl.emitCNRReportLantencyMetric(cnr)
assert.NoError(t, err)
if _, ok := ctrl.podTimeMap["1111111111"]; ok {
t.Errorf("podTimeMap should not have key 1111111111")
}
}
Loading

0 comments on commit 87193a3

Please sign in to comment.