From b7331b31c60806f7ca466a753657fc0940a9758d Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 9 Nov 2020 16:18:09 +0100 Subject: [PATCH] Fix duplicated events in kubernetes autodiscover for pods with init or ephemeral containers (#22438) (#22492) (cherry picked from commit 3dd693129f8e71755b8d5ed8d2252bb236a44210) --- CHANGELOG.next.asciidoc | 1 + .../autodiscover/providers/kubernetes/pod.go | 23 +- .../providers/kubernetes/pod_test.go | 359 +++++++++++++++++- 3 files changed, 376 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e9d9029cb79..d90f4aa56b0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -158,6 +158,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349] - Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851] - Fix parsing of expired licences. {issue}21112[21112] {pull}22180[22180] +- Fix duplicated pod events in kubernetes autodiscover for pods with init or ephemeral containers. {pull}22438[22438] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 0fb9d53d6a2..b7ef1e0150f 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -266,20 +266,33 @@ func (p *pod) Stop() { } func (p *pod) emit(pod *kubernetes.Pod, flag string) { + containers, statuses := getContainersInPod(pod) + p.emitEvents(pod, flag, containers, statuses) +} + +// getContainersInPod returns all the containers defined in a pod and their statuses. +// It includes init and ephemeral containers. +func getContainersInPod(pod *kubernetes.Pod) ([]kubernetes.Container, []kubernetes.PodContainerStatus) { + var containers []kubernetes.Container + var statuses []kubernetes.PodContainerStatus + // Emit events for all containers - p.emitEvents(pod, flag, pod.Spec.Containers, pod.Status.ContainerStatuses) + containers = append(containers, pod.Spec.Containers...) + statuses = append(statuses, pod.Status.ContainerStatuses...) // Emit events for all initContainers - p.emitEvents(pod, flag, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) + containers = append(containers, pod.Spec.InitContainers...) + statuses = append(statuses, pod.Status.InitContainerStatuses...) // Emit events for all ephemeralContainers // Ephemeral containers are alpha feature in k8s and this code may require some changes, if their // api change in the future. - var mappedEphemeralsAsContainers []kubernetes.Container for _, c := range pod.Spec.EphemeralContainers { - mappedEphemeralsAsContainers = append(mappedEphemeralsAsContainers, kubernetes.Container(c.EphemeralContainerCommon)) + containers = append(containers, kubernetes.Container(c.EphemeralContainerCommon)) } - p.emitEvents(pod, flag, mappedEphemeralsAsContainers, pod.Status.EphemeralContainerStatuses) + statuses = append(statuses, pod.Status.EphemeralContainerStatuses...) + + return containers, statuses } func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernetes.Container, diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 1c4ec983b82..31c5d438e20 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -1024,6 +1024,118 @@ func TestEmitEvent(t *testing.T) { }, }, }, + { + Message: "Test init container in common pod", + Flag: "start", + Pod: &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + InitContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + InitContainers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "ports": common.MapStr{}, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + }, + }, { Message: "Test ephemeral container in common pod", Flag: "start", @@ -1138,6 +1250,244 @@ func TestEmitEvent(t *testing.T) { }, }, }, + { + Message: "Test pod with ephemeral, init and normal container", + Flag: "start", + Pod: &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + InitContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name + "-init", + ContainerID: containerID + "-init", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + EphemeralContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name + "-ephemeral", + ContainerID: containerID + "-ephemeral", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + }, + }, + InitContainers: []kubernetes.Container{ + { + Image: containerImage, + Name: name + "-init", + }, + }, + EphemeralContainers: []v1.EphemeralContainer{ + v1.EphemeralContainer{ + EphemeralContainerCommon: v1.EphemeralContainerCommon{ + Image: containerImage, + Name: name + "-ephemeral", + }, + }, + }, + }, + }, + Expected: []bus.Event{ + // Single pod + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "ports": common.MapStr{}, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + // Container + { + "start": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + // Init container + { + "start": true, + "host": "127.0.0.1", + "port": 0, + "id": cid + "-init", + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar-init", + "name": "filebeat-init", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + "container": common.MapStr{ + "name": "filebeat-init", + "image": "elastic/filebeat:6.3.0", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar-init", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + // Ephemeral container + { + "start": true, + "host": "127.0.0.1", + "port": 0, + "id": cid + "-ephemeral", + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar-ephemeral", + "name": "filebeat-ephemeral", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + "container": common.MapStr{ + "name": "filebeat-ephemeral", + "image": "elastic/filebeat:6.3.0", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar-ephemeral", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + }, + }, } for _, test := range tests { @@ -1172,14 +1522,19 @@ func TestEmitEvent(t *testing.T) { for i := 0; i < len(test.Expected); i++ { select { case event := <-listener.Events(): - assert.Equal(t, test.Expected[i], event, test.Message) + assert.Equalf(t, test.Expected[i], event, "%s/#%d", test.Message, i) case <-time.After(2 * time.Second): if test.Expected != nil { - t.Fatal("Timeout while waiting for event") + t.Fatalf("Timeout while waiting for event #%d", i) } } } + select { + case <-listener.Events(): + t.Error("More events received than expected") + default: + } }) } }