Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/k8s_cluster] Do not store unused data in the k8s API cache #23417

Merged
merged 1 commit into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .chloggen/k8scluster-dont-store-unused-data-cache.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/k8s_cluster

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not store unused data in the k8s API cache to reduce RAM usage

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23417]
31 changes: 31 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver"

import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
)

// transformObject transforms the k8s object by removing the data that is not utilized by the receiver.
// Only highly utilized objects are transformed here while others are kept as is.
func transformObject(object interface{}) (interface{}, error) {
switch o := object.(type) {
case *corev1.Pod:
return pod.Transform(o), nil
case *corev1.Node:
return node.Transform(o), nil
case *appsv1.ReplicaSet:
return replicaset.Transform(o), nil
case *batchv1.Job:
return jobs.Transform(o), nil
}
return object, nil
}
87 changes: 87 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sclusterreceiver

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
)

func TestTransformObject(t *testing.T) {
i := 1
intPtr := &i
tests := []struct {
name string
object interface{}
want interface{}
same bool
}{
{
name: "pod",
object: testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", "container-id"),
),
want: func() *corev1.Pod {
pod := testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", "container-id"),
)
pod.Spec.Containers[0].Image = ""
pod.Status.ContainerStatuses[0].State = corev1.ContainerState{}
return pod
}(),
same: false,
},
{
name: "node",
object: testutils.NewNode("1"),
want: testutils.NewNode("1"),
same: false,
},
{
name: "replicaset",
object: testutils.NewReplicaSet("1"),
want: testutils.NewReplicaSet("1"),
same: false,
},
{
name: "job",
object: testutils.NewJob("1"),
want: testutils.NewJob("1"),
same: false,
},
{
// This is a case where we don't transform the object.
name: "hpa",
object: testutils.NewHPA("1"),
want: testutils.NewHPA("1"),
same: true,
},
{
name: "invalid_type",
object: intPtr,
want: intPtr,
same: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := transformObject(tt.object)
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
if tt.same {
assert.Same(t, tt.object, got)
} else {
assert.NotSame(t, tt.object, got)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
},
},
},
{
name: "Empty container id skips container resource",
metadataStore: &metadata.Store{},
resource: testutils.NewPodWithContainer(
"0",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", ""),
),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
ResourceIDKey: "k8s.pod.uid",
ResourceID: "test-pod-0-uid",
Metadata: commonPodMetadata,
},
},
},
{
name: "Pod with Owner Reference",
metadataStore: &metadata.Store{},
Expand Down
23 changes: 23 additions & 0 deletions receiver/k8sclusterreceiver/internal/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
Expand Down Expand Up @@ -51,6 +52,28 @@ var podsSuccessfulMetric = &metricspb.MetricDescriptor{
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

// Transform transforms the job to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function when using a new job fields.
func Transform(job *batchv1.Job) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name,
Namespace: job.ObjectMeta.Namespace,
UID: job.ObjectMeta.UID,
Labels: job.ObjectMeta.Labels,
},
Spec: batchv1.JobSpec{
Completions: job.Spec.Completions,
Parallelism: job.Spec.Parallelism,
},
Status: batchv1.JobStatus{
Active: job.Status.Active,
Succeeded: job.Status.Succeeded,
Failed: job.Status.Failed,
},
}
}

func GetMetrics(j *batchv1.Job) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := make([]*metricspb.Metric, 0, 5)
metrics = append(metrics, []*metricspb.Metric{
Expand Down
70 changes: 70 additions & 0 deletions receiver/k8sclusterreceiver/internal/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
Expand Down Expand Up @@ -60,3 +64,69 @@ func TestJobMetrics(t *testing.T) {
testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[2], "k8s.job.successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 3)
}

func TestTransform(t *testing.T) {
originalJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "my-job",
Namespace: "default",
UID: "my-job-uid",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: batchv1.JobSpec{
Completions: func() *int32 { completions := int32(1); return &completions }(),
Parallelism: func() *int32 { parallelism := int32(1); return &parallelism }(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "my-app",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "my-container",
Image: "busybox",
Command: []string{"echo", "Hello, World!"},
ImagePullPolicy: corev1.PullAlways,
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
Status: batchv1.JobStatus{
Active: 1,
Succeeded: 2,
Failed: 3,
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobComplete,
Status: corev1.ConditionTrue,
},
},
},
}
wantJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "my-job",
Namespace: "default",
UID: "my-job-uid",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: batchv1.JobSpec{
Completions: func() *int32 { completions := int32(1); return &completions }(),
Parallelism: func() *int32 { parallelism := int32(1); return &parallelism }(),
},
Status: batchv1.JobStatus{
Active: 1,
Succeeded: 2,
Failed: 3,
},
}
assert.Equal(t, wantJob, Transform(originalJob))
}
23 changes: 23 additions & 0 deletions receiver/k8sclusterreceiver/internal/node/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
Expand All @@ -34,6 +35,28 @@ var allocatableDesciption = map[string]string{
"storage": "How many bytes of storage remaining that the node can allocate to pods",
}

// Transform transforms the node to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function when using a new node fields.
func Transform(node *corev1.Node) *corev1.Node {
newNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: node.ObjectMeta.Name,
UID: node.ObjectMeta.UID,
Labels: node.ObjectMeta.Labels,
},
Status: corev1.NodeStatus{
Allocatable: node.Status.Allocatable,
},
}
for _, c := range node.Status.Conditions {
newNode.Status.Conditions = append(newNode.Status.Conditions, corev1.NodeCondition{
Type: c.Type,
Status: c.Status,
})
}
return newNode
}

func GetMetrics(node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string, logger *zap.Logger) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := make([]*metricspb.Metric, 0, len(nodeConditionTypesToReport)+len(allocatableTypesToReport))
// Adding 'node condition type' metrics
Expand Down
63 changes: 63 additions & 0 deletions receiver/k8sclusterreceiver/internal/node/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
Expand Down Expand Up @@ -140,3 +143,63 @@ func TestNodeConditionValue(t *testing.T) {
})
}
}

func TestTransform(t *testing.T) {
originalNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "my-node",
UID: "my-node-uid",
Labels: map[string]string{
"node-role": "worker",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("8"),
corev1.ResourceMemory: resource.MustParse("16Gi"),
},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeHostName,
Address: "my-node-hostname",
},
{
Type: corev1.NodeInternalIP,
Address: "192.168.1.100",
},
},
},
}
wantNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "my-node",
UID: "my-node-uid",
Labels: map[string]string{
"node-role": "worker",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
},
}
assert.Equal(t, wantNode, Transform(originalNode))
}
Loading