From 134fe9c1ae0e359993f234c1d5258d5dce4cb2bc Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Thu, 15 Jun 2023 14:44:05 -0700 Subject: [PATCH] [receiver/k8s_cluster] Do not store unused data in the k8s API cache This change removes unused k8s cache data to reduces RAM utilization --- ...scluster-dont-store-unused-data-cache.yaml | 15 ++ .../k8sclusterreceiver/informer_transform.go | 31 ++++ .../informer_transform_test.go | 87 ++++++++++++ .../internal/collection/collector_test.go | 16 --- .../k8sclusterreceiver/internal/jobs/jobs.go | 23 +++ .../internal/jobs/jobs_test.go | 70 +++++++++ .../k8sclusterreceiver/internal/node/nodes.go | 23 +++ .../internal/node/nodes_test.go | 63 +++++++++ .../k8sclusterreceiver/internal/pod/pods.go | 50 +++++-- .../internal/pod/pods_test.go | 133 ++++++++++++++++++ .../internal/replicaset/replicasets.go | 29 ++++ .../internal/replicaset/replicasets_test.go | 69 +++++++++ receiver/k8sclusterreceiver/watcher.go | 6 +- 13 files changed, 589 insertions(+), 26 deletions(-) create mode 100644 .chloggen/k8scluster-dont-store-unused-data-cache.yaml create mode 100644 receiver/k8sclusterreceiver/informer_transform.go create mode 100644 receiver/k8sclusterreceiver/informer_transform_test.go diff --git a/.chloggen/k8scluster-dont-store-unused-data-cache.yaml b/.chloggen/k8scluster-dont-store-unused-data-cache.yaml new file mode 100644 index 000000000000..9fc00db38eac --- /dev/null +++ b/.chloggen/k8scluster-dont-store-unused-data-cache.yaml @@ -0,0 +1,15 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/k8s_cluster + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Do not store unused data in the k8s API cache to reduce RAM usage + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23417] diff --git a/receiver/k8sclusterreceiver/informer_transform.go b/receiver/k8sclusterreceiver/informer_transform.go new file mode 100644 index 000000000000..26797bdf9962 --- /dev/null +++ b/receiver/k8sclusterreceiver/informer_transform.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver" + +import ( + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset" +) + +// transformObject transforms the k8s object by removing the data that is not utilized by the receiver. +// Only highly utilized objects are transformed here while others are kept as is. +func transformObject(object interface{}) (interface{}, error) { + switch o := object.(type) { + case *corev1.Pod: + return pod.Transform(o), nil + case *corev1.Node: + return node.Transform(o), nil + case *appsv1.ReplicaSet: + return replicaset.Transform(o), nil + case *batchv1.Job: + return jobs.Transform(o), nil + } + return object, nil +} diff --git a/receiver/k8sclusterreceiver/informer_transform_test.go b/receiver/k8sclusterreceiver/informer_transform_test.go new file mode 100644 index 000000000000..d47f18075d5e --- /dev/null +++ b/receiver/k8sclusterreceiver/informer_transform_test.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sclusterreceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" +) + +func TestTransformObject(t *testing.T) { + i := 1 + intPtr := &i + tests := []struct { + name string + object interface{} + want interface{} + same bool + }{ + { + name: "pod", + object: testutils.NewPodWithContainer( + "1", + testutils.NewPodSpecWithContainer("container-name"), + testutils.NewPodStatusWithContainer("container-name", "container-id"), + ), + want: func() *corev1.Pod { + pod := testutils.NewPodWithContainer( + "1", + testutils.NewPodSpecWithContainer("container-name"), + testutils.NewPodStatusWithContainer("container-name", "container-id"), + ) + pod.Spec.Containers[0].Image = "" + pod.Status.ContainerStatuses[0].State = corev1.ContainerState{} + return pod + }(), + same: false, + }, + { + name: "node", + object: testutils.NewNode("1"), + want: testutils.NewNode("1"), + same: false, + }, + { + name: "replicaset", + object: testutils.NewReplicaSet("1"), + want: testutils.NewReplicaSet("1"), + same: false, + }, + { + name: "job", + object: testutils.NewJob("1"), + want: testutils.NewJob("1"), + same: false, + }, + { + // This is a case where we don't transform the object. + name: "hpa", + object: testutils.NewHPA("1"), + want: testutils.NewHPA("1"), + same: true, + }, + { + name: "invalid_type", + object: intPtr, + want: intPtr, + same: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := transformObject(tt.object) + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + if tt.same { + assert.Same(t, tt.object, got) + } else { + assert.NotSame(t, tt.object, got) + } + }) + } +} diff --git a/receiver/k8sclusterreceiver/internal/collection/collector_test.go b/receiver/k8sclusterreceiver/internal/collection/collector_test.go index 78ed31f86fda..d271ddfc57f0 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector_test.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector_test.go @@ -62,22 +62,6 @@ func TestDataCollectorSyncMetadata(t *testing.T) { }, }, }, - { - name: "Empty container id skips container resource", - metadataStore: &metadata.Store{}, - resource: testutils.NewPodWithContainer( - "0", - testutils.NewPodSpecWithContainer("container-name"), - testutils.NewPodStatusWithContainer("container-name", ""), - ), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { - ResourceIDKey: "k8s.pod.uid", - ResourceID: "test-pod-0-uid", - Metadata: commonPodMetadata, - }, - }, - }, { name: "Pod with Owner Reference", metadataStore: &metadata.Store{}, diff --git a/receiver/k8sclusterreceiver/internal/jobs/jobs.go b/receiver/k8sclusterreceiver/internal/jobs/jobs.go index 7f6311af2ebe..7619110d779b 100644 --- a/receiver/k8sclusterreceiver/internal/jobs/jobs.go +++ b/receiver/k8sclusterreceiver/internal/jobs/jobs.go @@ -9,6 +9,7 @@ import ( resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" @@ -51,6 +52,28 @@ var podsSuccessfulMetric = &metricspb.MetricDescriptor{ Type: metricspb.MetricDescriptor_GAUGE_INT64, } +// Transform transforms the job to remove the fields that we don't use to reduce RAM utilization. +// IMPORTANT: Make sure to update this function when using a new job fields. +func Transform(job *batchv1.Job) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name, + Namespace: job.ObjectMeta.Namespace, + UID: job.ObjectMeta.UID, + Labels: job.ObjectMeta.Labels, + }, + Spec: batchv1.JobSpec{ + Completions: job.Spec.Completions, + Parallelism: job.Spec.Parallelism, + }, + Status: batchv1.JobStatus{ + Active: job.Status.Active, + Succeeded: job.Status.Succeeded, + Failed: job.Status.Failed, + }, + } +} + func GetMetrics(j *batchv1.Job) []*agentmetricspb.ExportMetricsServiceRequest { metrics := make([]*metricspb.Metric, 0, 5) metrics = append(metrics, []*metricspb.Metric{ diff --git a/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go b/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go index 6f0dde926e78..957610c45976 100644 --- a/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go +++ b/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go @@ -7,7 +7,11 @@ import ( "testing" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" @@ -60,3 +64,69 @@ func TestJobMetrics(t *testing.T) { testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[2], "k8s.job.successful_pods", metricspb.MetricDescriptor_GAUGE_INT64, 3) } + +func TestTransform(t *testing.T) { + originalJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-job", + Namespace: "default", + UID: "my-job-uid", + Labels: map[string]string{ + "app": "my-app", + }, + }, + Spec: batchv1.JobSpec{ + Completions: func() *int32 { completions := int32(1); return &completions }(), + Parallelism: func() *int32 { parallelism := int32(1); return ¶llelism }(), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "my-app", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-container", + Image: "busybox", + Command: []string{"echo", "Hello, World!"}, + ImagePullPolicy: corev1.PullAlways, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + Status: batchv1.JobStatus{ + Active: 1, + Succeeded: 2, + Failed: 3, + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }, + }, + }, + } + wantJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-job", + Namespace: "default", + UID: "my-job-uid", + Labels: map[string]string{ + "app": "my-app", + }, + }, + Spec: batchv1.JobSpec{ + Completions: func() *int32 { completions := int32(1); return &completions }(), + Parallelism: func() *int32 { parallelism := int32(1); return ¶llelism }(), + }, + Status: batchv1.JobStatus{ + Active: 1, + Succeeded: 2, + Failed: 3, + }, + } + assert.Equal(t, wantJob, Transform(originalJob)) +} diff --git a/receiver/k8sclusterreceiver/internal/node/nodes.go b/receiver/k8sclusterreceiver/internal/node/nodes.go index d2f71bc88170..90c0411268ab 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes.go @@ -14,6 +14,7 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" @@ -34,6 +35,28 @@ var allocatableDesciption = map[string]string{ "storage": "How many bytes of storage remaining that the node can allocate to pods", } +// Transform transforms the node to remove the fields that we don't use to reduce RAM utilization. +// IMPORTANT: Make sure to update this function when using a new node fields. +func Transform(node *corev1.Node) *corev1.Node { + newNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.ObjectMeta.Name, + UID: node.ObjectMeta.UID, + Labels: node.ObjectMeta.Labels, + }, + Status: corev1.NodeStatus{ + Allocatable: node.Status.Allocatable, + }, + } + for _, c := range node.Status.Conditions { + newNode.Status.Conditions = append(newNode.Status.Conditions, corev1.NodeCondition{ + Type: c.Type, + Status: c.Status, + }) + } + return newNode +} + func GetMetrics(node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string, logger *zap.Logger) []*agentmetricspb.ExportMetricsServiceRequest { metrics := make([]*metricspb.Metric, 0, len(nodeConditionTypesToReport)+len(allocatableTypesToReport)) // Adding 'node condition type' metrics diff --git a/receiver/k8sclusterreceiver/internal/node/nodes_test.go b/receiver/k8sclusterreceiver/internal/node/nodes_test.go index 3d6643155838..cb75c7f0b340 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes_test.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes_test.go @@ -7,9 +7,12 @@ import ( "testing" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" @@ -140,3 +143,63 @@ func TestNodeConditionValue(t *testing.T) { }) } } + +func TestTransform(t *testing.T) { + originalNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-node", + UID: "my-node-uid", + Labels: map[string]string{ + "node-role": "worker", + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeHostName, + Address: "my-node-hostname", + }, + { + Type: corev1.NodeInternalIP, + Address: "192.168.1.100", + }, + }, + }, + } + wantNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-node", + UID: "my-node-uid", + Labels: map[string]string{ + "node-role": "worker", + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + } + assert.Equal(t, wantNode, Transform(originalNode)) +} diff --git a/receiver/k8sclusterreceiver/internal/pod/pods.go b/receiver/k8sclusterreceiver/internal/pod/pods.go index 0ef9b292459b..c675478bd4b2 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods.go @@ -40,6 +40,47 @@ var podPhaseMetric = &metricspb.MetricDescriptor{ Type: metricspb.MetricDescriptor_GAUGE_INT64, } +// Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization. +// IMPORTANT: Make sure to update this function when using a new pod fields. +func Transform(pod *corev1.Pod) *corev1.Pod { + newPod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + UID: pod.ObjectMeta.UID, + Name: pod.ObjectMeta.Name, + Namespace: pod.ObjectMeta.Namespace, + Labels: pod.ObjectMeta.Labels, + }, + Spec: corev1.PodSpec{ + NodeName: pod.Spec.NodeName, + }, + Status: corev1.PodStatus{ + Phase: pod.Status.Phase, + }, + } + for _, cs := range pod.Status.ContainerStatuses { + if cs.ContainerID == "" { + continue + } + newPod.Status.ContainerStatuses = append(newPod.Status.ContainerStatuses, corev1.ContainerStatus{ + Name: cs.Name, + Image: cs.Image, + ContainerID: cs.ContainerID, + RestartCount: cs.RestartCount, + Ready: cs.Ready, + }) + } + for _, c := range pod.Spec.Containers { + newPod.Spec.Containers = append(newPod.Spec.Containers, corev1.Container{ + Name: c.Name, + Resources: corev1.ResourceRequirements{ + Requests: c.Resources.Requests, + Limits: c.Resources.Limits, + }, + }) + } + return newPod +} + func GetMetrics(pod *corev1.Pod, logger *zap.Logger) []*agentmetricspb.ExportMetricsServiceRequest { metrics := []*metricspb.Metric{ { @@ -55,10 +96,6 @@ func GetMetrics(pod *corev1.Pod, logger *zap.Logger) []*agentmetricspb.ExportMet containerResByName := map[string]*agentmetricspb.ExportMetricsServiceRequest{} for _, cs := range pod.Status.ContainerStatuses { - if cs.ContainerID == "" { - continue - } - contLabels := container.GetAllLabels(cs, podRes.Labels, logger) containerResByName[cs.Name] = &agentmetricspb.ExportMetricsServiceRequest{Resource: container.GetResource(contLabels)} @@ -265,11 +302,6 @@ func getWorkloadProperties(ref *v1.OwnerReference, labelKey string) map[string]s func getPodContainerProperties(pod *corev1.Pod) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{} for _, cs := range pod.Status.ContainerStatuses { - // Skip if container id returned is empty. - if cs.ContainerID == "" { - continue - } - md := container.GetMetadata(cs) km[md.ResourceID] = md } diff --git a/receiver/k8sclusterreceiver/internal/pod/pods_test.go b/receiver/k8sclusterreceiver/internal/pod/pods_test.go index de276b41ec37..4985bc86ae0b 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods_test.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods_test.go @@ -7,15 +7,18 @@ import ( "fmt" "strings" "testing" + "time" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -356,3 +359,133 @@ func podWithOwnerReference(kind string) *corev1.Pod { }, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{}), ).(*corev1.Pod) } + +func TestTransform(t *testing.T) { + originalPod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "my-pod", + Namespace: "default", + Labels: map[string]string{ + "app": "my-app", + "version": "v1", + }, + Annotations: map[string]string{ + "example.com/annotation": "some-value", + }, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyAlways, + NodeName: "node-1", + HostNetwork: true, + HostIPC: true, + HostPID: true, + DNSPolicy: corev1.DNSClusterFirst, + TerminationGracePeriodSeconds: func() *int64 { + gracePeriodSeconds := int64(30) + return &gracePeriodSeconds + }(), + SecurityContext: &corev1.PodSecurityContext{ + RunAsUser: func() *int64 { uid := int64(1000); return &uid }(), + RunAsGroup: func() *int64 { gid := int64(2000); return &gid }(), + FSGroup: func() *int64 { gid := int64(3000); return &gid }(), + }, + Containers: []corev1.Container{ + { + Name: "my-container", + Image: "nginx:latest", + ImagePullPolicy: corev1.PullAlways, + Ports: []corev1.ContainerPort{ + { + Name: "http", + ContainerPort: 80, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "https", + ContainerPort: 443, + Protocol: corev1.ProtocolTCP, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "MY_ENV", + Value: "my-value", + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + HostIP: "192.168.1.100", + PodIP: "10.244.0.5", + StartTime: &v1.Time{Time: v1.Now().Add(-5 * time.Minute)}, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "invalid-container", + Image: "redis:latest", + RestartCount: 1, + }, + { + Name: "my-container", + Image: "nginx:latest", + ContainerID: "abc12345", + RestartCount: 2, + Ready: true, + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{StartedAt: v1.Now()}}, + }, + }, + }, + } + wantPod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "my-pod", + Namespace: "default", + Labels: map[string]string{ + "app": "my-app", + "version": "v1", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + Containers: []corev1.Container{ + { + Name: "my-container", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "my-container", + Image: "nginx:latest", + ContainerID: "abc12345", + RestartCount: 2, + Ready: true, + }, + }, + }, + } + assert.Equal(t, wantPod, Transform(originalPod)) +} diff --git a/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go b/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go index 22714f5b2e5c..f1e606e5a7b4 100644 --- a/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go +++ b/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go @@ -8,6 +8,7 @@ import ( resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" @@ -15,6 +16,34 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replica" ) +// Transform transforms the replica set to remove the fields that we don't use to reduce RAM utilization. +// IMPORTANT: Make sure to update this function when using new replicaset fields. +func Transform(rs *appsv1.ReplicaSet) *appsv1.ReplicaSet { + newRS := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: rs.ObjectMeta.Name, + Namespace: rs.ObjectMeta.Namespace, + UID: rs.ObjectMeta.UID, + CreationTimestamp: rs.ObjectMeta.CreationTimestamp, + Labels: rs.ObjectMeta.Labels, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: rs.Spec.Replicas, + }, + Status: appsv1.ReplicaSetStatus{ + AvailableReplicas: rs.Status.AvailableReplicas, + }, + } + for _, or := range rs.ObjectMeta.OwnerReferences { + newRS.ObjectMeta.OwnerReferences = append(newRS.ObjectMeta.OwnerReferences, metav1.OwnerReference{ + Name: or.Name, + UID: or.UID, + Kind: or.Kind, + }) + } + return newRS +} + func GetMetrics(rs *appsv1.ReplicaSet) []*agentmetricspb.ExportMetricsServiceRequest { if rs.Spec.Replicas == nil { return nil diff --git a/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go b/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go index c2c3a67cf3ee..b459ce1fad8d 100644 --- a/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go +++ b/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go @@ -7,7 +7,11 @@ import ( "testing" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" @@ -36,3 +40,68 @@ func TestReplicasetMetrics(t *testing.T) { testutils.AssertMetricsInt(t, rm.Metrics[1], "k8s.replicaset.available", metricspb.MetricDescriptor_GAUGE_INT64, 2) } + +func TestTransform(t *testing.T) { + originalRS := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-replicaset", + Namespace: "default", + Labels: map[string]string{ + "app": "my-app", + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: func() *int32 { replicas := int32(3); return &replicas }(), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "my-app", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "my-app", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "my-container", + Image: "nginx:latest", + ImagePullPolicy: v1.PullAlways, + Ports: []v1.ContainerPort{ + { + Name: "http", + ContainerPort: 80, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + }, + }, + Status: appsv1.ReplicaSetStatus{ + Replicas: 3, + FullyLabeledReplicas: 3, + ReadyReplicas: 3, + AvailableReplicas: 3, + }, + } + wantRS := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-replicaset", + Namespace: "default", + Labels: map[string]string{ + "app": "my-app", + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: func() *int32 { replicas := int32(3); return &replicas }(), + }, + Status: appsv1.ReplicaSetStatus{ + AvailableReplicas: 3, + }, + } + assert.Equal(t, wantRS, Transform(originalRS)) +} diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index b403092497ed..029cacd56c81 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -217,7 +217,11 @@ func (rw *resourceWatcher) startWatchingResources(ctx context.Context, inf share // setupInformer adds event handlers to informers and setups a metadataStore. func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer) { - _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + err := informer.SetTransform(transformObject) + if err != nil { + rw.logger.Error("error setting informer transform function", zap.Error(err)) + } + _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rw.onAdd, UpdateFunc: rw.onUpdate, DeleteFunc: rw.onDelete,