diff --git a/.chloggen/k8s-attrs-fix-ns-node-labels-annotation-setting.yaml b/.chloggen/k8s-attrs-fix-ns-node-labels-annotation-setting.yaml new file mode 100644 index 000000000000..968d79c41e68 --- /dev/null +++ b/.chloggen/k8s-attrs-fix-ns-node-labels-annotation-setting.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/k8sattributes + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Set attributes from namespace/node labels or annotations even if node/namespaces attribute are not set. + +# One or more tracking issues related to the change +issues: [28837] diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 2a10b9b86afa..e158a3e62f05 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -113,8 +113,10 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco return } + var pod *kube.Pod if podIdentifierValue.IsNotEmpty() { - if pod, ok := kp.kc.GetPod(podIdentifierValue); ok { + var podFound bool + if pod, podFound = kp.kc.GetPod(podIdentifierValue); podFound { kp.logger.Debug("getting the pod", zap.Any("pod", pod)) for key, val := range pod.Attributes { @@ -126,7 +128,7 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } } - namespace := stringAttributeFromMap(resource.Attributes(), conventions.AttributeK8SNamespaceName) + namespace := getNamespace(pod, resource.Attributes()) if namespace != "" { attrsToAdd := kp.getAttributesForPodsNamespace(namespace) for key, val := range attrsToAdd { @@ -136,7 +138,7 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } } - nodeName := stringAttributeFromMap(resource.Attributes(), conventions.AttributeK8SNodeName) + nodeName := getNodeName(pod, resource.Attributes()) if nodeName != "" { attrsToAdd := kp.getAttributesForPodsNode(nodeName) for key, val := range attrsToAdd { @@ -147,6 +149,20 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } } +func getNamespace(pod *kube.Pod, resAttrs pcommon.Map) string { + if pod != nil && pod.Namespace != "" { + return pod.Namespace + } + return stringAttributeFromMap(resAttrs, conventions.AttributeK8SNamespaceName) +} + +func getNodeName(pod *kube.Pod, resAttrs pcommon.Map) string { + if pod != nil && pod.NodeName != "" { + return pod.NodeName + } + return stringAttributeFromMap(resAttrs, conventions.AttributeK8SNodeName) +} + // addContainerAttributes looks if pod has any container identifiers and adds additional container attributes func (kp *kubernetesprocessor) addContainerAttributes(attrs pcommon.Map, pod *kube.Pod) { containerName := stringAttributeFromMap(attrs, conventions.AttributeK8SContainerName) diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index be0fd7118582..ac7bf6587dc0 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -659,7 +659,7 @@ func TestPodUID(t *testing.T) { }) } -func TestProcessorAddLabels(t *testing.T) { +func TestAddPodLabels(t *testing.T) { m := newMultiTest( t, NewFactory().CreateDefaultConfig(), @@ -727,6 +727,150 @@ func TestProcessorAddLabels(t *testing.T) { } } +func TestAddNamespaceLabels(t *testing.T) { + m := newMultiTest( + t, + func() component.Config { + cfg := createDefaultConfig().(*Config) + cfg.Extract.Metadata = []string{} + cfg.Extract.Labels = []FieldExtractConfig{ + { + From: kube.MetadataFromNamespace, + Key: "namespace-label", + }, + } + return cfg + }(), + nil, + ) + + podIP := "1.1.1.1" + namespaces := map[string]map[string]string{ + "namespace-1": { + "nslabel": "1", + }, + "namespace-2": { + "nslabel": "2", + }, + } + m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) { + kp.podAssociations = []kube.Association{ + { + Sources: []kube.AssociationSource{ + { + From: "connection", + }, + }, + }, + } + }) + + m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) { + pi := kube.PodIdentifier{ + kube.PodIdentifierAttributeFromConnection(podIP), + } + kp.kc.(*fakeClient).Pods[pi] = &kube.Pod{Name: "test-2323", Namespace: "namespace-1"} + kp.kc.(*fakeClient).Namespaces = make(map[string]*kube.Namespace) + for ns, labels := range namespaces { + kp.kc.(*fakeClient).Namespaces[ns] = &kube.Namespace{Attributes: labels} + } + }) + + ctx := client.NewContext(context.Background(), client.Info{ + Addr: &net.IPAddr{ + IP: net.ParseIP(podIP), + }, + }) + m.testConsume( + ctx, + generateTraces(), + generateMetrics(), + generateLogs(), + func(err error) { + assert.NoError(t, err) + }) + + m.assertBatchesLen(1) + m.assertResourceObjectLen(0) + m.assertResource(0, func(res pcommon.Resource) { + assert.Equal(t, 2, res.Attributes().Len()) + assertResourceHasStringAttribute(t, res, "k8s.pod.ip", podIP) + assertResourceHasStringAttribute(t, res, "nslabel", "1") + }) +} + +func TestAddNodeLabels(t *testing.T) { + m := newMultiTest( + t, + func() component.Config { + cfg := createDefaultConfig().(*Config) + cfg.Extract.Metadata = []string{} + cfg.Extract.Labels = []FieldExtractConfig{ + { + From: kube.MetadataFromNode, + Key: "node-label", + }, + } + return cfg + }(), + nil, + ) + + podIP := "1.1.1.1" + nodes := map[string]map[string]string{ + "node-1": { + "nodelabel": "1", + }, + "node-2": { + "nodelabel": "2", + }, + } + m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) { + kp.podAssociations = []kube.Association{ + { + Sources: []kube.AssociationSource{ + { + From: "connection", + }, + }, + }, + } + }) + + m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) { + pi := kube.PodIdentifier{ + kube.PodIdentifierAttributeFromConnection(podIP), + } + kp.kc.(*fakeClient).Pods[pi] = &kube.Pod{Name: "test-2323", NodeName: "node-1"} + kp.kc.(*fakeClient).Nodes = make(map[string]*kube.Node) + for ns, labels := range nodes { + kp.kc.(*fakeClient).Nodes[ns] = &kube.Node{Attributes: labels} + } + }) + + ctx := client.NewContext(context.Background(), client.Info{ + Addr: &net.IPAddr{ + IP: net.ParseIP(podIP), + }, + }) + m.testConsume( + ctx, + generateTraces(), + generateMetrics(), + generateLogs(), + func(err error) { + assert.NoError(t, err) + }) + + m.assertBatchesLen(1) + m.assertResourceObjectLen(0) + m.assertResource(0, func(res pcommon.Resource) { + assert.Equal(t, 2, res.Attributes().Len()) + assertResourceHasStringAttribute(t, res, "k8s.pod.ip", podIP) + assertResourceHasStringAttribute(t, res, "nodelabel", "1") + }) +} + func TestProcessorAddContainerAttributes(t *testing.T) { tests := []struct { name string