From e1198f75103337a5ed287d1601a456ee1d995c56 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 11 Jan 2021 17:01:39 +0200 Subject: [PATCH 1/6] Handle DeletedFinalStateUnknown in k8s OnDelete Signed-off-by: chrismark --- .../autodiscover/providers/kubernetes/node.go | 18 +++++++++++++++- .../autodiscover/providers/kubernetes/pod.go | 16 +++++++++++++- .../providers/kubernetes/service.go | 21 ++++++++++++++++--- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index 6019364991b..92d58f3257a 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -19,12 +19,14 @@ package kubernetes import ( "fmt" + "github.com/golang/glog" "time" "github.com/gofrs/uuid" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/builder" "github.com/elastic/beats/v7/libbeat/common" @@ -114,7 +116,21 @@ func (n *node) OnUpdate(obj interface{}) { // OnDelete ensures processing of node objects that are deleted func (n *node) OnDelete(obj interface{}) { n.logger.Debugf("Watcher Node delete: %+v", obj) - time.AfterFunc(n.config.CleanupTimeout, func() { n.emit(obj.(*kubernetes.Node), "stop") }) + node, isNode := obj.(*kubernetes.Node) + // We can get DeletedFinalStateUnknown instead of *kubernetes.Node here and we need to handle that correctly. #23385 + if !isNode { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Received unexpected object: %v", obj) + return + } + node, ok = deletedState.Obj.(*kubernetes.Node) + if !ok { + glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + return + } + } + time.AfterFunc(n.config.CleanupTimeout, func() { n.emit(node, "stop") }) } // GenerateHints creates hints needed for hints builder diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 02e273663ba..6c642908488 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -163,7 +163,21 @@ func (p *pod) OnUpdate(obj interface{}) { // OnDelete stops pod objects that are deleted func (p *pod) OnDelete(obj interface{}) { p.logger.Debugf("Watcher Pod delete: %+v", obj) - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) + pod, isPod := obj.(*kubernetes.Pod) + // We can get DeletedFinalStateUnknown instead of *kubernetes.Pod here and we need to handle that correctly. #23385 + if !isPod { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + p.logger.Debugf("Received unexpected object: %+v", obj) + return + } + pod, ok = deletedState.Obj.(*kubernetes.Pod) + if !ok { + p.logger.Debugf("DeletedFinalStateUnknown contained non-Pod object: %+v", deletedState.Obj) + return + } + } + time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) } // GenerateHints creates hints needed for hints builder diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 2aa699c50ad..b3dd71882c0 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -19,17 +19,18 @@ package kubernetes import ( "fmt" + "github.com/golang/glog" "time" - "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" - "github.com/gofrs/uuid" k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/builder" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -114,7 +115,21 @@ func (s *service) OnUpdate(obj interface{}) { // OnDelete ensures processing of service objects that are deleted func (s *service) OnDelete(obj interface{}) { s.logger.Debugf("Watcher service delete: %+v", obj) - time.AfterFunc(s.config.CleanupTimeout, func() { s.emit(obj.(*kubernetes.Service), "stop") }) + service, isNode := obj.(*kubernetes.Service) + // We can get DeletedFinalStateUnknown instead of *kubernetes.Service here and we need to handle that correctly. #23385 + if !isNode { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Received unexpected object: %v", obj) + return + } + service, ok = deletedState.Obj.(*kubernetes.Service) + if !ok { + glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + return + } + } + time.AfterFunc(s.config.CleanupTimeout, func() { s.emit(service, "stop") }) } // GenerateHints creates hints needed for hints builder From 78c5c56c803ae33d8848dff2c98faef940d7f04f Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 11 Jan 2021 17:08:19 +0200 Subject: [PATCH 2/6] fix Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/node.go | 5 ++--- libbeat/autodiscover/providers/kubernetes/service.go | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index 92d58f3257a..aad9a9d358a 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -19,7 +19,6 @@ package kubernetes import ( "fmt" - "github.com/golang/glog" "time" "github.com/gofrs/uuid" @@ -121,12 +120,12 @@ func (n *node) OnDelete(obj interface{}) { if !isNode { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Received unexpected object: %v", obj) + n.logger.Errorf("Received unexpected object: %+v", obj) return } node, ok = deletedState.Obj.(*kubernetes.Node) if !ok { - glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + n.logger.Errorf("DeletedFinalStateUnknown contained non-Node object: %+v", deletedState.Obj) return } } diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index b3dd71882c0..86265b3a0c7 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -19,7 +19,6 @@ package kubernetes import ( "fmt" - "github.com/golang/glog" "time" "github.com/gofrs/uuid" @@ -120,12 +119,12 @@ func (s *service) OnDelete(obj interface{}) { if !isNode { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Received unexpected object: %v", obj) + s.logger.Errorf("Received unexpected object: %+v", obj) return } service, ok = deletedState.Obj.(*kubernetes.Service) if !ok { - glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + s.logger.Errorf("DeletedFinalStateUnknown contained non-Node object: %+v", deletedState.Obj) return } } From 3f63e323fae8906fdb157b8e980e0f32f95260cd Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 11 Jan 2021 17:09:30 +0200 Subject: [PATCH 3/6] fixup Signed-off-by: chrismark --- libbeat/autodiscover/providers/kubernetes/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 86265b3a0c7..158f73ce4d9 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -124,7 +124,7 @@ func (s *service) OnDelete(obj interface{}) { } service, ok = deletedState.Obj.(*kubernetes.Service) if !ok { - s.logger.Errorf("DeletedFinalStateUnknown contained non-Node object: %+v", deletedState.Obj) + s.logger.Errorf("DeletedFinalStateUnknown contained non-Service object: %+v", deletedState.Obj) return } } From 4de58571994290dc18804b07570cb910179b25ed Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 12 Jan 2021 10:45:15 +0200 Subject: [PATCH 4/6] Add tests Signed-off-by: chrismark --- .../providers/kubernetes/node_test.go | 190 +++++++++++ .../providers/kubernetes/pod_test.go | 306 ++++++++++++++++++ .../providers/kubernetes/service_test.go | 169 ++++++++++ 3 files changed, 665 insertions(+) diff --git a/libbeat/autodiscover/providers/kubernetes/node_test.go b/libbeat/autodiscover/providers/kubernetes/node_test.go index 6eb22b185e1..429a7c8ed23 100644 --- a/libbeat/autodiscover/providers/kubernetes/node_test.go +++ b/libbeat/autodiscover/providers/kubernetes/node_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" @@ -345,6 +346,195 @@ func TestEmitEvent_Node(t *testing.T) { } } +func TestNode_OnDelete(t *testing.T) { + name := "metricbeat" + nameUnknown := "metricbeat-unknown" + nodeIP := "192.168.0.1" + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + UUID, err := uuid.NewV4() + + typeMeta := metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + } + if err != nil { + t.Fatal(err) + } + + tests := []struct { + Message string + Flag string + Node *kubernetes.Node + Expected bus.Event + }{ + { + Message: "Test node stop", + Node: &kubernetes.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeExternalIP, + Address: nodeIP, + }, + { + Type: v1.NodeInternalIP, + Address: "1.2.3.4", + }, + { + Type: v1.NodeHostName, + Address: "node1", + }, + }, + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "192.168.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "node": common.MapStr{ + "name": "metricbeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "hostname": "node1", + }, + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "node": common.MapStr{ + "name": "metricbeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "hostname": "node1", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + { + Message: "Test node stop with DeletedFinalStateUnknown", + Node: &kubernetes.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nameUnknown, + UID: types.UID(uid), + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeExternalIP, + Address: nodeIP, + }, + { + Type: v1.NodeInternalIP, + Address: "1.2.3.4", + }, + { + Type: v1.NodeHostName, + Address: "node1", + }, + }, + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "192.168.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "node": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "hostname": "node1", + }, + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "node": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "hostname": "node1", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + } + + for _, test := range tests { + t.Run(test.Message, func(t *testing.T) { + mapper, err := template.NewConfigMapper(nil, nil, nil) + if err != nil { + t.Fatal(err) + } + + metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil) + config := defaultConfig() + p := &Provider{ + config: config, + bus: bus.New(logp.NewLogger("bus"), "test"), + templates: mapper, + logger: logp.NewLogger("kubernetes"), + } + + no := &node{ + metagen: metaGen, + config: defaultConfig(), + publish: p.publish, + uuid: UUID, + logger: logp.NewLogger("kubernetes.no"), + } + no.config.CleanupTimeout = 1 * time.Second + p.eventManager = NewMockNodeEventerManager(no) + + listener := p.bus.Subscribe() + + if test.Node.Name == nameUnknown { + deletedState := cache.DeletedFinalStateUnknown{ + Key: "testnode", + Obj: test.Node, + } + no.OnDelete(deletedState) + } else { + no.OnDelete(test.Node) + } + + select { + case event := <-listener.Events(): + assert.Equal(t, test.Expected, event, test.Message) + case <-time.After(4 * time.Second): + if test.Expected != nil { + t.Fatal("Timeout while waiting for event") + } + } + }) + } +} + func NewMockNodeEventerManager(no *node) EventManager { em := &eventerManager{} em.eventer = no diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 1673d295ffb..46839cac981 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" @@ -1540,6 +1541,311 @@ func TestEmitEvent(t *testing.T) { } } +func TestPod_OnDelete(t *testing.T) { + name := "filebeat" + nameUnknown := "filebeat-unknown" + namespace := "default" + podIP := "127.0.0.1" + containerID := "docker://foobar" + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + containerImage := "elastic/filebeat:6.3.0" + node := "node" + cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat" + UUID, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + typeMeta := metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + } + + tests := []struct { + Message string + Flag string + Pod *kubernetes.Pod + Expected []bus.Event + }{ + { + Message: "Test stop pod", + Flag: "stop", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "stop": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "ports": common.MapStr{}, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "runtime": "docker", + "id": "foobar", + }, + }, + "config": []*common.Config{}, + }, + }, + }, + { + Message: "Test stop pod with DeletedFinalStateUnknown", + Flag: "stop", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: nameUnknown, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "stop": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "ports": common.MapStr{}, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, + "pod": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "runtime": "docker", + "id": "foobar", + }, + }, + "config": []*common.Config{}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.Message, func(t *testing.T) { + mapper, err := template.NewConfigMapper(nil, nil, nil) + if err != nil { + t.Fatal(err) + } + + metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) + p := &Provider{ + config: defaultConfig(), + bus: bus.New(logp.NewLogger("bus"), "test"), + templates: mapper, + logger: logp.NewLogger("kubernetes"), + } + + pub := &publisher{b: p.bus} + pod := &pod{ + metagen: metaGen, + config: defaultConfig(), + publish: pub.publish, + uuid: UUID, + logger: logp.NewLogger("kubernetes.pod"), + } + pod.config.CleanupTimeout = 1 * time.Second + p.eventManager = NewMockPodEventerManager(pod) + + listener := p.bus.Subscribe() + + if test.Pod.Name == nameUnknown { + deletedState := cache.DeletedFinalStateUnknown{ + Key: "testpod", + Obj: test.Pod, + } + pod.OnDelete(deletedState) + } else { + pod.OnDelete(test.Pod) + } + + for i := 0; i < len(test.Expected); i++ { + select { + case event := <-listener.Events(): + assert.Equalf(t, test.Expected[i], event, "%s/#%d", test.Message, i) + case <-time.After(2 * time.Second): + if test.Expected != nil { + t.Fatalf("Timeout while waiting for event #%d", i) + } + } + } + + select { + case <-listener.Events(): + t.Error("More events received than expected") + default: + } + }) + } + +} + func NewMockPodEventerManager(pod *pod) EventManager { em := &eventerManager{} em.eventer = pod diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 999ba853117..75c8ac52941 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" @@ -429,6 +430,174 @@ func TestEmitEvent_Service(t *testing.T) { } } +func TestService_OnDelete(t *testing.T) { + name := "metricbeat" + nameUnknown := "metricbeat-unknown" + namespace := "default" + clusterIP := "192.168.0.1" + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + UUID, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + typeMeta := metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + } + + tests := []struct { + Message string + Flag string + Service *kubernetes.Service + Expected bus.Event + }{ + { + Message: "Test service stop", + Flag: "stop", + Service: &kubernetes.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 8080, + }, + }, + ClusterIP: clusterIP, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "192.168.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "service": common.MapStr{ + "name": "metricbeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "service": common.MapStr{ + "name": "metricbeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + { + Message: "Test service stop with DeletedFinalStateUnknown", + Flag: "stop", + Service: &kubernetes.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: nameUnknown, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 8080, + }, + }, + ClusterIP: clusterIP, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "192.168.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "service": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "service": common.MapStr{ + "name": nameUnknown, + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + } + + for _, test := range tests { + t.Run(test.Message, func(t *testing.T) { + mapper, err := template.NewConfigMapper(nil, nil, nil) + if err != nil { + t.Fatal(err) + } + + metaGen := metadata.NewServiceMetadataGenerator(common.NewConfig(), nil, nil) + + p := &Provider{ + config: defaultConfig(), + bus: bus.New(logp.NewLogger("bus"), "test"), + templates: mapper, + logger: logp.NewLogger("kubernetes"), + } + + service := &service{ + metagen: metaGen, + config: defaultConfig(), + publish: p.publish, + uuid: UUID, + logger: logp.NewLogger("kubernetes.service"), + } + service.config.CleanupTimeout = 1 * time.Second + p.eventManager = NewMockServiceEventerManager(service) + + listener := p.bus.Subscribe() + + if test.Service.Name == nameUnknown { + deletedState := cache.DeletedFinalStateUnknown{ + Key: "testsvc", + Obj: test.Service, + } + service.OnDelete(deletedState) + } else { + service.OnDelete(test.Service) + } + + select { + case event := <-listener.Events(): + assert.Equal(t, test.Expected, event, test.Message) + case <-time.After(4 * time.Second): + if test.Expected != nil { + t.Fatal("Timeout while waiting for event") + } + } + }) + } +} + func NewMockServiceEventerManager(svc *service) EventManager { em := &eventerManager{} em.eventer = svc From bf712062819057ba5f958811094a83f8c583e01a Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 12 Jan 2021 10:46:52 +0200 Subject: [PATCH 5/6] Add changelong Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bf89e89479d..0da1838066e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -229,6 +229,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix typo in config docs {pull}23185[23185] - Fix `nested` subfield handling in generated Elasticsearch templates. {issue}23178[23178] {pull}23183[23183] - Fix CPU usage metrics on VMs with dynamic CPU config {pull}23154[23154] +- Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419] *Auditbeat* From 305e2263ea1b216a922ca9aa84202f26e0de8fb7 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 12 Jan 2021 14:12:26 +0200 Subject: [PATCH 6/6] Move check about deleted state to watcher Signed-off-by: chrismark --- .../autodiscover/providers/kubernetes/node.go | 17 +- .../providers/kubernetes/node_test.go | 190 ----------- .../autodiscover/providers/kubernetes/pod.go | 16 +- .../providers/kubernetes/pod_test.go | 306 ------------------ .../providers/kubernetes/service.go | 17 +- .../providers/kubernetes/service_test.go | 169 ---------- libbeat/common/kubernetes/watcher.go | 4 + 7 files changed, 7 insertions(+), 712 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index aad9a9d358a..6019364991b 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -25,7 +25,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" k8s "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/builder" "github.com/elastic/beats/v7/libbeat/common" @@ -115,21 +114,7 @@ func (n *node) OnUpdate(obj interface{}) { // OnDelete ensures processing of node objects that are deleted func (n *node) OnDelete(obj interface{}) { n.logger.Debugf("Watcher Node delete: %+v", obj) - node, isNode := obj.(*kubernetes.Node) - // We can get DeletedFinalStateUnknown instead of *kubernetes.Node here and we need to handle that correctly. #23385 - if !isNode { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - n.logger.Errorf("Received unexpected object: %+v", obj) - return - } - node, ok = deletedState.Obj.(*kubernetes.Node) - if !ok { - n.logger.Errorf("DeletedFinalStateUnknown contained non-Node object: %+v", deletedState.Obj) - return - } - } - time.AfterFunc(n.config.CleanupTimeout, func() { n.emit(node, "stop") }) + time.AfterFunc(n.config.CleanupTimeout, func() { n.emit(obj.(*kubernetes.Node), "stop") }) } // GenerateHints creates hints needed for hints builder diff --git a/libbeat/autodiscover/providers/kubernetes/node_test.go b/libbeat/autodiscover/providers/kubernetes/node_test.go index 429a7c8ed23..6eb22b185e1 100644 --- a/libbeat/autodiscover/providers/kubernetes/node_test.go +++ b/libbeat/autodiscover/providers/kubernetes/node_test.go @@ -26,7 +26,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" @@ -346,195 +345,6 @@ func TestEmitEvent_Node(t *testing.T) { } } -func TestNode_OnDelete(t *testing.T) { - name := "metricbeat" - nameUnknown := "metricbeat-unknown" - nodeIP := "192.168.0.1" - uid := "005f3b90-4b9d-12f8-acf0-31020a840133" - UUID, err := uuid.NewV4() - - typeMeta := metav1.TypeMeta{ - Kind: "Node", - APIVersion: "v1", - } - if err != nil { - t.Fatal(err) - } - - tests := []struct { - Message string - Flag string - Node *kubernetes.Node - Expected bus.Event - }{ - { - Message: "Test node stop", - Node: &kubernetes.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(uid), - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - TypeMeta: typeMeta, - Status: v1.NodeStatus{ - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeExternalIP, - Address: nodeIP, - }, - { - Type: v1.NodeInternalIP, - Address: "1.2.3.4", - }, - { - Type: v1.NodeHostName, - Address: "node1", - }, - }, - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - }, - }, - }, - }, - Expected: bus.Event{ - "stop": true, - "host": "192.168.0.1", - "id": uid, - "provider": UUID, - "kubernetes": common.MapStr{ - "node": common.MapStr{ - "name": "metricbeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - "hostname": "node1", - }, - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "node": common.MapStr{ - "name": "metricbeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - "hostname": "node1", - }, - }, - }, - "config": []*common.Config{}, - }, - }, - { - Message: "Test node stop with DeletedFinalStateUnknown", - Node: &kubernetes.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: nameUnknown, - UID: types.UID(uid), - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - TypeMeta: typeMeta, - Status: v1.NodeStatus{ - Addresses: []v1.NodeAddress{ - { - Type: v1.NodeExternalIP, - Address: nodeIP, - }, - { - Type: v1.NodeInternalIP, - Address: "1.2.3.4", - }, - { - Type: v1.NodeHostName, - Address: "node1", - }, - }, - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - }, - }, - }, - }, - Expected: bus.Event{ - "stop": true, - "host": "192.168.0.1", - "id": uid, - "provider": UUID, - "kubernetes": common.MapStr{ - "node": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - "hostname": "node1", - }, - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "node": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - "hostname": "node1", - }, - }, - }, - "config": []*common.Config{}, - }, - }, - } - - for _, test := range tests { - t.Run(test.Message, func(t *testing.T) { - mapper, err := template.NewConfigMapper(nil, nil, nil) - if err != nil { - t.Fatal(err) - } - - metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil) - config := defaultConfig() - p := &Provider{ - config: config, - bus: bus.New(logp.NewLogger("bus"), "test"), - templates: mapper, - logger: logp.NewLogger("kubernetes"), - } - - no := &node{ - metagen: metaGen, - config: defaultConfig(), - publish: p.publish, - uuid: UUID, - logger: logp.NewLogger("kubernetes.no"), - } - no.config.CleanupTimeout = 1 * time.Second - p.eventManager = NewMockNodeEventerManager(no) - - listener := p.bus.Subscribe() - - if test.Node.Name == nameUnknown { - deletedState := cache.DeletedFinalStateUnknown{ - Key: "testnode", - Obj: test.Node, - } - no.OnDelete(deletedState) - } else { - no.OnDelete(test.Node) - } - - select { - case event := <-listener.Events(): - assert.Equal(t, test.Expected, event, test.Message) - case <-time.After(4 * time.Second): - if test.Expected != nil { - t.Fatal("Timeout while waiting for event") - } - } - }) - } -} - func NewMockNodeEventerManager(no *node) EventManager { em := &eventerManager{} em.eventer = no diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 6c642908488..02e273663ba 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -163,21 +163,7 @@ func (p *pod) OnUpdate(obj interface{}) { // OnDelete stops pod objects that are deleted func (p *pod) OnDelete(obj interface{}) { p.logger.Debugf("Watcher Pod delete: %+v", obj) - pod, isPod := obj.(*kubernetes.Pod) - // We can get DeletedFinalStateUnknown instead of *kubernetes.Pod here and we need to handle that correctly. #23385 - if !isPod { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - p.logger.Debugf("Received unexpected object: %+v", obj) - return - } - pod, ok = deletedState.Obj.(*kubernetes.Pod) - if !ok { - p.logger.Debugf("DeletedFinalStateUnknown contained non-Pod object: %+v", deletedState.Obj) - return - } - } - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) + time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) } // GenerateHints creates hints needed for hints builder diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 46839cac981..1673d295ffb 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -26,7 +26,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" @@ -1541,311 +1540,6 @@ func TestEmitEvent(t *testing.T) { } } -func TestPod_OnDelete(t *testing.T) { - name := "filebeat" - nameUnknown := "filebeat-unknown" - namespace := "default" - podIP := "127.0.0.1" - containerID := "docker://foobar" - uid := "005f3b90-4b9d-12f8-acf0-31020a840133" - containerImage := "elastic/filebeat:6.3.0" - node := "node" - cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat" - UUID, err := uuid.NewV4() - if err != nil { - t.Fatal(err) - } - - typeMeta := metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - } - - tests := []struct { - Message string - Flag string - Pod *kubernetes.Pod - Expected []bus.Event - }{ - { - Message: "Test stop pod", - Flag: "stop", - Pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(uid), - Namespace: namespace, - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - TypeMeta: typeMeta, - Status: v1.PodStatus{ - PodIP: podIP, - ContainerStatuses: []kubernetes.PodContainerStatus{ - { - Name: name, - ContainerID: containerID, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: node, - Containers: []kubernetes.Container{ - { - Image: containerImage, - Name: name, - }, - }, - }, - }, - Expected: []bus.Event{ - { - "stop": true, - "host": "127.0.0.1", - "id": uid, - "provider": UUID, - "ports": common.MapStr{}, - "kubernetes": common.MapStr{ - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{ - "name": "node", - }, - "namespace": "default", - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "namespace": "default", - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ - "name": "node", - }, - }, - }, - "config": []*common.Config{}, - }, - { - "stop": true, - "host": "127.0.0.1", - "port": 0, - "id": cid, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "foobar", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "docker", - }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{ - "name": "node", - }, - "namespace": "default", - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "namespace": "default", - "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ - "name": "node", - }, - }, - "container": common.MapStr{ - "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, - "runtime": "docker", - "id": "foobar", - }, - }, - "config": []*common.Config{}, - }, - }, - }, - { - Message: "Test stop pod with DeletedFinalStateUnknown", - Flag: "stop", - Pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: nameUnknown, - UID: types.UID(uid), - Namespace: namespace, - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - TypeMeta: typeMeta, - Status: v1.PodStatus{ - PodIP: podIP, - ContainerStatuses: []kubernetes.PodContainerStatus{ - { - Name: name, - ContainerID: containerID, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: node, - Containers: []kubernetes.Container{ - { - Image: containerImage, - Name: name, - }, - }, - }, - }, - Expected: []bus.Event{ - { - "stop": true, - "host": "127.0.0.1", - "id": uid, - "provider": UUID, - "ports": common.MapStr{}, - "kubernetes": common.MapStr{ - "pod": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{ - "name": "node", - }, - "namespace": "default", - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "namespace": "default", - "pod": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ - "name": "node", - }, - }, - }, - "config": []*common.Config{}, - }, - { - "stop": true, - "host": "127.0.0.1", - "port": 0, - "id": cid, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "foobar", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "docker", - }, - "pod": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{ - "name": "node", - }, - "namespace": "default", - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "namespace": "default", - "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, - "pod": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ - "name": "node", - }, - }, - "container": common.MapStr{ - "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, - "runtime": "docker", - "id": "foobar", - }, - }, - "config": []*common.Config{}, - }, - }, - }, - } - - for _, test := range tests { - t.Run(test.Message, func(t *testing.T) { - mapper, err := template.NewConfigMapper(nil, nil, nil) - if err != nil { - t.Fatal(err) - } - - metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) - p := &Provider{ - config: defaultConfig(), - bus: bus.New(logp.NewLogger("bus"), "test"), - templates: mapper, - logger: logp.NewLogger("kubernetes"), - } - - pub := &publisher{b: p.bus} - pod := &pod{ - metagen: metaGen, - config: defaultConfig(), - publish: pub.publish, - uuid: UUID, - logger: logp.NewLogger("kubernetes.pod"), - } - pod.config.CleanupTimeout = 1 * time.Second - p.eventManager = NewMockPodEventerManager(pod) - - listener := p.bus.Subscribe() - - if test.Pod.Name == nameUnknown { - deletedState := cache.DeletedFinalStateUnknown{ - Key: "testpod", - Obj: test.Pod, - } - pod.OnDelete(deletedState) - } else { - pod.OnDelete(test.Pod) - } - - for i := 0; i < len(test.Expected); i++ { - select { - case event := <-listener.Events(): - assert.Equalf(t, test.Expected[i], event, "%s/#%d", test.Message, i) - case <-time.After(2 * time.Second): - if test.Expected != nil { - t.Fatalf("Timeout while waiting for event #%d", i) - } - } - } - - select { - case <-listener.Events(): - t.Error("More events received than expected") - default: - } - }) - } - -} - func NewMockPodEventerManager(pod *pod) EventManager { em := &eventerManager{} em.eventer = pod diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 158f73ce4d9..f9dff6dc940 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -23,7 +23,6 @@ import ( "github.com/gofrs/uuid" k8s "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/builder" "github.com/elastic/beats/v7/libbeat/common" @@ -114,21 +113,7 @@ func (s *service) OnUpdate(obj interface{}) { // OnDelete ensures processing of service objects that are deleted func (s *service) OnDelete(obj interface{}) { s.logger.Debugf("Watcher service delete: %+v", obj) - service, isNode := obj.(*kubernetes.Service) - // We can get DeletedFinalStateUnknown instead of *kubernetes.Service here and we need to handle that correctly. #23385 - if !isNode { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - s.logger.Errorf("Received unexpected object: %+v", obj) - return - } - service, ok = deletedState.Obj.(*kubernetes.Service) - if !ok { - s.logger.Errorf("DeletedFinalStateUnknown contained non-Service object: %+v", deletedState.Obj) - return - } - } - time.AfterFunc(s.config.CleanupTimeout, func() { s.emit(service, "stop") }) + time.AfterFunc(s.config.CleanupTimeout, func() { s.emit(obj.(*kubernetes.Service), "stop") }) } // GenerateHints creates hints needed for hints builder diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 75c8ac52941..999ba853117 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -26,7 +26,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/common" @@ -430,174 +429,6 @@ func TestEmitEvent_Service(t *testing.T) { } } -func TestService_OnDelete(t *testing.T) { - name := "metricbeat" - nameUnknown := "metricbeat-unknown" - namespace := "default" - clusterIP := "192.168.0.1" - uid := "005f3b90-4b9d-12f8-acf0-31020a840133" - UUID, err := uuid.NewV4() - if err != nil { - t.Fatal(err) - } - - typeMeta := metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - } - - tests := []struct { - Message string - Flag string - Service *kubernetes.Service - Expected bus.Event - }{ - { - Message: "Test service stop", - Flag: "stop", - Service: &kubernetes.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(uid), - Namespace: namespace, - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - TypeMeta: typeMeta, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: "http", - Port: 8080, - }, - }, - ClusterIP: clusterIP, - }, - }, - Expected: bus.Event{ - "stop": true, - "host": "192.168.0.1", - "id": uid, - "provider": UUID, - "kubernetes": common.MapStr{ - "service": common.MapStr{ - "name": "metricbeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "namespace": "default", - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "namespace": "default", - "service": common.MapStr{ - "name": "metricbeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - }, - }, - "config": []*common.Config{}, - }, - }, - { - Message: "Test service stop with DeletedFinalStateUnknown", - Flag: "stop", - Service: &kubernetes.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: nameUnknown, - UID: types.UID(uid), - Namespace: namespace, - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - TypeMeta: typeMeta, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: "http", - Port: 8080, - }, - }, - ClusterIP: clusterIP, - }, - }, - Expected: bus.Event{ - "stop": true, - "host": "192.168.0.1", - "id": uid, - "provider": UUID, - "kubernetes": common.MapStr{ - "service": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "namespace": "default", - "annotations": common.MapStr{}, - }, - "meta": common.MapStr{ - "kubernetes": common.MapStr{ - "namespace": "default", - "service": common.MapStr{ - "name": nameUnknown, - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - }, - }, - "config": []*common.Config{}, - }, - }, - } - - for _, test := range tests { - t.Run(test.Message, func(t *testing.T) { - mapper, err := template.NewConfigMapper(nil, nil, nil) - if err != nil { - t.Fatal(err) - } - - metaGen := metadata.NewServiceMetadataGenerator(common.NewConfig(), nil, nil) - - p := &Provider{ - config: defaultConfig(), - bus: bus.New(logp.NewLogger("bus"), "test"), - templates: mapper, - logger: logp.NewLogger("kubernetes"), - } - - service := &service{ - metagen: metaGen, - config: defaultConfig(), - publish: p.publish, - uuid: UUID, - logger: logp.NewLogger("kubernetes.service"), - } - service.config.CleanupTimeout = 1 * time.Second - p.eventManager = NewMockServiceEventerManager(service) - - listener := p.bus.Subscribe() - - if test.Service.Name == nameUnknown { - deletedState := cache.DeletedFinalStateUnknown{ - Key: "testsvc", - Obj: test.Service, - } - service.OnDelete(deletedState) - } else { - service.OnDelete(test.Service) - } - - select { - case event := <-listener.Events(): - assert.Equal(t, test.Expected, event, test.Message) - case <-time.After(4 * time.Second): - if test.Expected != nil { - t.Fatal("Timeout while waiting for event") - } - } - }) - } -} - func NewMockServiceEventerManager(svc *service) EventManager { em := &eventerManager{} em.eventer = svc diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index eb7f330a64e..f72a98de0a7 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -199,6 +199,10 @@ func (w *watcher) enqueue(obj interface{}, state string) { if err != nil { return } + if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok { + w.logger.Debugf("Enqueued DeletedFinalStateUnknown contained object: %+v", deleted.Obj) + obj = deleted.Obj + } w.queue.Add(&item{key, obj, state}) }